Skip to content

Commit

Permalink
refactor(event cache): a few AllEventsCache refactorings (#4471)
Browse files Browse the repository at this point in the history
I was investigating a potential deadlock with the event cache storage,
and only found a few places where to make the code a bit more idiomatic
and more readable.
  • Loading branch information
bnjbvr authored Jan 7, 2025
1 parent 8205da8 commit 2ef14de
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 121 deletions.
116 changes: 115 additions & 1 deletion crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ use matrix_sdk_common::executor::{spawn, JoinHandle};
use once_cell::sync::OnceCell;
use room::RoomEventCacheState;
use ruma::{
events::{relation::RelationType, AnySyncEphemeralRoomEvent},
events::{
relation::RelationType,
room::{message::Relation, redaction::SyncRoomRedactionEvent},
AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
AnySyncTimelineEvent,
},
serde::Raw,
EventId, OwnedEventId, OwnedRoomId, RoomId,
};
Expand Down Expand Up @@ -363,6 +368,115 @@ impl AllEventsCache {
self.events.clear();
self.relations.clear();
}

/// If the event is related to another one, its id is added to the relations
/// map.
fn append_related_event(&mut self, event: &SyncTimelineEvent) {
// Handle and cache events and relations.
let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() else {
return;
};

// Handle redactions separately, as their logic is slightly different.
if let AnySyncMessageLikeEvent::RoomRedaction(SyncRoomRedactionEvent::Original(ev)) = &ev {
if let Some(redacted_event_id) = ev.content.redacts.as_ref().or(ev.redacts.as_ref()) {
self.relations
.entry(redacted_event_id.to_owned())
.or_default()
.insert(ev.event_id.to_owned(), RelationType::Replacement);
}
return;
}

let relationship = match ev.original_content() {
Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
if let Some(relation) = c.relates_to {
match relation {
Relation::Replacement(replacement) => {
Some((replacement.event_id, RelationType::Replacement))
}
Relation::Reply { in_reply_to } => {
Some((in_reply_to.event_id, RelationType::Reference))
}
Relation::Thread(thread) => Some((thread.event_id, RelationType::Thread)),
// Do nothing for custom
_ => None,
}
} else {
None
}
}
Some(AnyMessageLikeEventContent::PollResponse(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::PollEnd(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::Reaction(c)) => {
Some((c.relates_to.event_id, RelationType::Annotation))
}
_ => None,
};

if let Some(relationship) = relationship {
self.relations
.entry(relationship.0)
.or_default()
.insert(ev.event_id().to_owned(), relationship.1);
}
}

/// Looks for related event ids for the passed event id, and appends them to
/// the `results` parameter. Then it'll recursively get the related
/// event ids for those too.
fn collect_related_events(
&self,
event_id: &EventId,
filter: Option<&[RelationType]>,
) -> Vec<SyncTimelineEvent> {
let mut results = Vec::new();
self.collect_related_events_rec(event_id, filter, &mut results);
results
}

fn collect_related_events_rec(
&self,
event_id: &EventId,
filter: Option<&[RelationType]>,
results: &mut Vec<SyncTimelineEvent>,
) {
let Some(related_event_ids) = self.relations.get(event_id) else {
return;
};

for (related_event_id, relation_type) in related_event_ids {
if let Some(filter) = filter {
if !filter.contains(relation_type) {
continue;
}
}

// If the event was already added to the related ones, skip it.
if results.iter().any(|event| {
event.event_id().is_some_and(|added_related_event_id| {
added_related_event_id == *related_event_id
})
}) {
continue;
}

if let Some((_, ev)) = self.events.get(related_event_id) {
results.push(ev.clone());
self.collect_related_events_rec(related_event_id, filter, results);
}
}
}
}

struct EventCacheInner {
Expand Down
129 changes: 9 additions & 120 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,13 @@ use matrix_sdk_base::{
sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
};
use ruma::{
events::{
relation::RelationType,
room::{message::Relation, redaction::SyncRoomRedactionEvent},
AnyMessageLikeEventContent, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent,
AnySyncMessageLikeEvent, AnySyncTimelineEvent,
},
events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
serde::Raw,
EventId, OwnedEventId, OwnedRoomId,
};
use tokio::sync::{
broadcast::{Receiver, Sender},
Notify, RwLock, RwLockReadGuard, RwLockWriteGuard,
Notify, RwLock,
};
use tracing::{debug, trace, warn};

Expand Down Expand Up @@ -118,12 +113,10 @@ impl RoomEventCache {
event_id: &EventId,
filter: Option<Vec<RelationType>>,
) -> Option<(SyncTimelineEvent, Vec<SyncTimelineEvent>)> {
let mut relation_events = Vec::new();

let cache = self.inner.all_events.read().await;
if let Some((_, event)) = cache.events.get(event_id) {
Self::collect_related_events(&cache, event_id, &filter, &mut relation_events);
Some((event.clone(), relation_events))
let related_events = cache.collect_related_events(event_id, filter.as_deref());
Some((event.clone(), related_events))
} else {
None
}
Expand All @@ -150,39 +143,6 @@ impl RoomEventCache {
Ok(())
}

/// Looks for related event ids for the passed event id, and appends them to
/// the `results` parameter. Then it'll recursively get the related
/// event ids for those too.
fn collect_related_events(
cache: &RwLockReadGuard<'_, AllEventsCache>,
event_id: &EventId,
filter: &Option<Vec<RelationType>>,
results: &mut Vec<SyncTimelineEvent>,
) {
if let Some(related_event_ids) = cache.relations.get(event_id) {
for (related_event_id, relation_type) in related_event_ids {
if let Some(filter) = filter {
if !filter.contains(relation_type) {
continue;
}
}

// If the event was already added to the related ones, skip it.
if results.iter().any(|e| {
e.event_id().is_some_and(|added_related_event_id| {
added_related_event_id == *related_event_id
})
}) {
continue;
}
if let Some((_, ev)) = cache.events.get(related_event_id) {
results.push(ev.clone());
Self::collect_related_events(cache, related_event_id, filter, results);
}
}
}
}

/// Save a single event in the event cache, for further retrieval with
/// [`Self::event`].
// TODO: This doesn't insert the event into the linked chunk. In the future
Expand All @@ -192,7 +152,7 @@ impl RoomEventCache {
if let Some(event_id) = event.event_id() {
let mut cache = self.inner.all_events.write().await;

self.inner.append_related_event(&mut cache, &event);
cache.append_related_event(&event);
cache.events.insert(event_id, (self.inner.room_id.clone(), event));
} else {
warn!("couldn't save event without event id in the event cache");
Expand All @@ -209,7 +169,7 @@ impl RoomEventCache {
let mut cache = self.inner.all_events.write().await;
for event in events {
if let Some(event_id) = event.event_id() {
self.inner.append_related_event(&mut cache, &event);
cache.append_related_event(&event);
cache.events.insert(event_id, (self.inner.room_id.clone(), event));
} else {
warn!("couldn't save event without event id in the event cache");
Expand All @@ -235,9 +195,9 @@ pub(super) struct RoomEventCacheInner {
/// State for this room's event cache.
pub state: RwLock<RoomEventCacheState>,

/// See comment of [`EventCacheInner::all_events`].
/// See comment of [`super::EventCacheInner::all_events`].
///
/// This is shared between the [`EventCacheInner`] singleton and all
/// This is shared between the [`super::EventCacheInner`] singleton and all
/// [`RoomEventCacheInner`] instances.
all_events: Arc<RwLock<AllEventsCache>>,

Expand Down Expand Up @@ -441,77 +401,6 @@ impl RoomEventCacheInner {
.await
}

/// If the event is related to another one, its id is added to the
/// relations map.
fn append_related_event(
&self,
cache: &mut RwLockWriteGuard<'_, AllEventsCache>,
event: &SyncTimelineEvent,
) {
// Handle and cache events and relations.
if let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() {
// Handle redactions separately, as their logic is slightly different.
if let AnySyncMessageLikeEvent::RoomRedaction(SyncRoomRedactionEvent::Original(ev)) =
&ev
{
if let Some(redacted_event_id) = ev.content.redacts.as_ref().or(ev.redacts.as_ref())
{
cache
.relations
.entry(redacted_event_id.to_owned())
.or_default()
.insert(ev.event_id.to_owned(), RelationType::Replacement);
}
} else {
let relationship = match ev.original_content() {
Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
if let Some(relation) = c.relates_to {
match relation {
Relation::Replacement(replacement) => {
Some((replacement.event_id, RelationType::Replacement))
}
Relation::Reply { in_reply_to } => {
Some((in_reply_to.event_id, RelationType::Reference))
}
Relation::Thread(thread) => {
Some((thread.event_id, RelationType::Thread))
}
// Do nothing for custom
_ => None,
}
} else {
None
}
}
Some(AnyMessageLikeEventContent::PollResponse(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::PollEnd(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::Reaction(c)) => {
Some((c.relates_to.event_id, RelationType::Annotation))
}
_ => None,
};

if let Some(relationship) = relationship {
cache
.relations
.entry(relationship.0)
.or_default()
.insert(ev.event_id().to_owned(), relationship.1);
}
}
}
}

/// Append a set of events and associated room data.
///
/// This is a private implementation. It must not be exposed publicly.
Expand Down Expand Up @@ -563,7 +452,7 @@ impl RoomEventCacheInner {

for sync_timeline_event in &sync_timeline_events {
if let Some(event_id) = sync_timeline_event.event_id() {
self.append_related_event(&mut all_events, sync_timeline_event);
all_events.append_related_event(sync_timeline_event);
all_events.events.insert(
event_id.to_owned(),
(self.room_id.clone(), sync_timeline_event.clone()),
Expand Down

0 comments on commit 2ef14de

Please sign in to comment.