Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(event cache): a few AllEventsCache refactorings #4471

Merged
merged 8 commits into from
Jan 7, 2025
116 changes: 115 additions & 1 deletion crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ use matrix_sdk_common::executor::{spawn, JoinHandle};
use once_cell::sync::OnceCell;
use room::RoomEventCacheState;
use ruma::{
events::{relation::RelationType, AnySyncEphemeralRoomEvent},
events::{
relation::RelationType,
room::{message::Relation, redaction::SyncRoomRedactionEvent},
AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
AnySyncTimelineEvent,
},
serde::Raw,
EventId, OwnedEventId, OwnedRoomId, RoomId,
};
Expand Down Expand Up @@ -363,6 +368,115 @@ impl AllEventsCache {
self.events.clear();
self.relations.clear();
}

/// If the event is related to another one, its id is added to the relations
/// map.
fn append_related_event(&mut self, event: &SyncTimelineEvent) {
// Handle and cache events and relations.
let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() else {
return;
};

// Handle redactions separately, as their logic is slightly different.
if let AnySyncMessageLikeEvent::RoomRedaction(SyncRoomRedactionEvent::Original(ev)) = &ev {
if let Some(redacted_event_id) = ev.content.redacts.as_ref().or(ev.redacts.as_ref()) {
self.relations
.entry(redacted_event_id.to_owned())
.or_default()
.insert(ev.event_id.to_owned(), RelationType::Replacement);
}
return;
}

let relationship = match ev.original_content() {
Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
if let Some(relation) = c.relates_to {
match relation {
Relation::Replacement(replacement) => {
Some((replacement.event_id, RelationType::Replacement))
}
Relation::Reply { in_reply_to } => {
Some((in_reply_to.event_id, RelationType::Reference))
}
Relation::Thread(thread) => Some((thread.event_id, RelationType::Thread)),
// Do nothing for custom
_ => None,
}
} else {
None
}
}
Some(AnyMessageLikeEventContent::PollResponse(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::PollEnd(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::Reaction(c)) => {
Some((c.relates_to.event_id, RelationType::Annotation))
}
_ => None,
};

if let Some(relationship) = relationship {
self.relations
.entry(relationship.0)
.or_default()
.insert(ev.event_id().to_owned(), relationship.1);
}
}

/// Looks for related event ids for the passed event id, and appends them to
/// the `results` parameter. Then it'll recursively get the related
/// event ids for those too.
fn collect_related_events(
&self,
event_id: &EventId,
filter: Option<&[RelationType]>,
) -> Vec<SyncTimelineEvent> {
let mut results = Vec::new();
self.collect_related_events_rec(event_id, filter, &mut results);
results
}

fn collect_related_events_rec(
&self,
event_id: &EventId,
filter: Option<&[RelationType]>,
results: &mut Vec<SyncTimelineEvent>,
) {
let Some(related_event_ids) = self.relations.get(event_id) else {
return;
};

for (related_event_id, relation_type) in related_event_ids {
if let Some(filter) = filter {
if !filter.contains(relation_type) {
continue;
}
}

// If the event was already added to the related ones, skip it.
if results.iter().any(|event| {
event.event_id().is_some_and(|added_related_event_id| {
added_related_event_id == *related_event_id
})
}) {
continue;
}

if let Some((_, ev)) = self.events.get(related_event_id) {
results.push(ev.clone());
self.collect_related_events_rec(related_event_id, filter, results);
}
}
}
}

struct EventCacheInner {
Expand Down
129 changes: 9 additions & 120 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,13 @@ use matrix_sdk_base::{
sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
};
use ruma::{
events::{
relation::RelationType,
room::{message::Relation, redaction::SyncRoomRedactionEvent},
AnyMessageLikeEventContent, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent,
AnySyncMessageLikeEvent, AnySyncTimelineEvent,
},
events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
serde::Raw,
EventId, OwnedEventId, OwnedRoomId,
};
use tokio::sync::{
broadcast::{Receiver, Sender},
Notify, RwLock, RwLockReadGuard, RwLockWriteGuard,
Notify, RwLock,
};
use tracing::{debug, trace, warn};

Expand Down Expand Up @@ -118,12 +113,10 @@ impl RoomEventCache {
event_id: &EventId,
filter: Option<Vec<RelationType>>,
) -> Option<(SyncTimelineEvent, Vec<SyncTimelineEvent>)> {
let mut relation_events = Vec::new();

let cache = self.inner.all_events.read().await;
if let Some((_, event)) = cache.events.get(event_id) {
Self::collect_related_events(&cache, event_id, &filter, &mut relation_events);
Some((event.clone(), relation_events))
let related_events = cache.collect_related_events(event_id, filter.as_deref());
Some((event.clone(), related_events))
} else {
None
}
Expand All @@ -150,39 +143,6 @@ impl RoomEventCache {
Ok(())
}

/// Looks for related event ids for the passed event id, and appends them to
/// the `results` parameter. Then it'll recursively get the related
/// event ids for those too.
fn collect_related_events(
cache: &RwLockReadGuard<'_, AllEventsCache>,
event_id: &EventId,
filter: &Option<Vec<RelationType>>,
results: &mut Vec<SyncTimelineEvent>,
) {
if let Some(related_event_ids) = cache.relations.get(event_id) {
for (related_event_id, relation_type) in related_event_ids {
if let Some(filter) = filter {
if !filter.contains(relation_type) {
continue;
}
}

// If the event was already added to the related ones, skip it.
if results.iter().any(|e| {
e.event_id().is_some_and(|added_related_event_id| {
added_related_event_id == *related_event_id
})
}) {
continue;
}
if let Some((_, ev)) = cache.events.get(related_event_id) {
results.push(ev.clone());
Self::collect_related_events(cache, related_event_id, filter, results);
}
}
}
}

/// Save a single event in the event cache, for further retrieval with
/// [`Self::event`].
// TODO: This doesn't insert the event into the linked chunk. In the future
Expand All @@ -192,7 +152,7 @@ impl RoomEventCache {
if let Some(event_id) = event.event_id() {
let mut cache = self.inner.all_events.write().await;

self.inner.append_related_event(&mut cache, &event);
cache.append_related_event(&event);
cache.events.insert(event_id, (self.inner.room_id.clone(), event));
} else {
warn!("couldn't save event without event id in the event cache");
Expand All @@ -209,7 +169,7 @@ impl RoomEventCache {
let mut cache = self.inner.all_events.write().await;
for event in events {
if let Some(event_id) = event.event_id() {
self.inner.append_related_event(&mut cache, &event);
cache.append_related_event(&event);
cache.events.insert(event_id, (self.inner.room_id.clone(), event));
} else {
warn!("couldn't save event without event id in the event cache");
Expand All @@ -235,9 +195,9 @@ pub(super) struct RoomEventCacheInner {
/// State for this room's event cache.
pub state: RwLock<RoomEventCacheState>,

/// See comment of [`EventCacheInner::all_events`].
/// See comment of [`super::EventCacheInner::all_events`].
///
/// This is shared between the [`EventCacheInner`] singleton and all
/// This is shared between the [`super::EventCacheInner`] singleton and all
/// [`RoomEventCacheInner`] instances.
all_events: Arc<RwLock<AllEventsCache>>,

Expand Down Expand Up @@ -441,77 +401,6 @@ impl RoomEventCacheInner {
.await
}

/// If the event is related to another one, its id is added to the
/// relations map.
fn append_related_event(
&self,
cache: &mut RwLockWriteGuard<'_, AllEventsCache>,
event: &SyncTimelineEvent,
) {
// Handle and cache events and relations.
if let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() {
// Handle redactions separately, as their logic is slightly different.
if let AnySyncMessageLikeEvent::RoomRedaction(SyncRoomRedactionEvent::Original(ev)) =
&ev
{
if let Some(redacted_event_id) = ev.content.redacts.as_ref().or(ev.redacts.as_ref())
{
cache
.relations
.entry(redacted_event_id.to_owned())
.or_default()
.insert(ev.event_id.to_owned(), RelationType::Replacement);
}
} else {
let relationship = match ev.original_content() {
Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
if let Some(relation) = c.relates_to {
match relation {
Relation::Replacement(replacement) => {
Some((replacement.event_id, RelationType::Replacement))
}
Relation::Reply { in_reply_to } => {
Some((in_reply_to.event_id, RelationType::Reference))
}
Relation::Thread(thread) => {
Some((thread.event_id, RelationType::Thread))
}
// Do nothing for custom
_ => None,
}
} else {
None
}
}
Some(AnyMessageLikeEventContent::PollResponse(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::PollEnd(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
Some((c.relates_to.event_id, RelationType::Reference))
}
Some(AnyMessageLikeEventContent::Reaction(c)) => {
Some((c.relates_to.event_id, RelationType::Annotation))
}
_ => None,
};

if let Some(relationship) = relationship {
cache
.relations
.entry(relationship.0)
.or_default()
.insert(ev.event_id().to_owned(), relationship.1);
}
}
}
}

/// Append a set of events and associated room data.
///
/// This is a private implementation. It must not be exposed publicly.
Expand Down Expand Up @@ -563,7 +452,7 @@ impl RoomEventCacheInner {

for sync_timeline_event in &sync_timeline_events {
if let Some(event_id) = sync_timeline_event.event_id() {
self.append_related_event(&mut all_events, sync_timeline_event);
all_events.append_related_event(sync_timeline_event);
all_events.events.insert(
event_id.to_owned(),
(self.room_id.clone(), sync_timeline_event.clone()),
Expand Down
Loading