From 05d46e6027074a501b1810a5cab06131d2921769 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jorge=20Mart=C3=ADn?= Date: Fri, 13 Dec 2024 12:22:02 +0100 Subject: [PATCH] Rename `JoinRequest` in the SDK crates to `KnockRequest`, make `Room::mark_knock_requests_as_seen` thread safe and pass `user_ids` instead of `event_ids`: the user ids will be used to get the related member state events and they'll only be marked as seen if they're in a knock state. Also, add extra checks to the integration tests. --- bindings/matrix-sdk-ffi/src/room.rs | 46 ++-- crates/matrix-sdk-base/src/rooms/normal.rs | 82 ++++++ .../matrix-sdk-base/src/store/memory_store.rs | 2 +- crates/matrix-sdk-base/src/store/traits.rs | 4 +- .../src/state_store/mod.rs | 4 +- crates/matrix-sdk-sqlite/src/state_store.rs | 4 +- .../{request_to_join.rs => knock_requests.rs} | 85 +++--- crates/matrix-sdk/src/room/mod.rs | 256 ++++++++---------- .../tests/integration/room/joined.rs | 73 ++--- 9 files changed, 309 insertions(+), 247 deletions(-) rename crates/matrix-sdk/src/room/{request_to_join.rs => knock_requests.rs} (65%) diff --git a/bindings/matrix-sdk-ffi/src/room.rs b/bindings/matrix-sdk-ffi/src/room.rs index 6cf0a53e9e3..c1233b51151 100644 --- a/bindings/matrix-sdk-ffi/src/room.rs +++ b/bindings/matrix-sdk-ffi/src/room.rs @@ -912,17 +912,17 @@ impl Room { Ok(()) } - /// Subscribes to requests to join this room, using a `listener` to be - /// notified of the changes. + /// Subscribes to requests to join this room (knock member events), using a + /// `listener` to be notified of the changes. /// /// The current requests to join the room will be emitted immediately /// when subscribing, along with a [`TaskHandle`] to cancel the /// subscription. - pub async fn subscribe_to_join_requests( + pub async fn subscribe_to_knock_requests( self: Arc, - listener: Box, + listener: Box, ) -> Result, ClientError> { - let stream = self.inner.subscribe_to_join_requests().await?; + let stream = self.inner.subscribe_to_knock_requests().await?; let handle = Arc::new(TaskHandle::new(RUNTIME.spawn(async move { pin_mut!(stream); @@ -935,8 +935,8 @@ impl Room { } } -impl From for JoinRequest { - fn from(request: matrix_sdk::room::request_to_join::JoinRequest) -> Self { +impl From for KnockRequest { + fn from(request: matrix_sdk::room::knock_requests::KnockRequest) -> Self { Self { event_id: request.event_id.to_string(), user_id: request.member_info.user_id.to_string(), @@ -946,20 +946,20 @@ impl From for JoinRequest { reason: request.member_info.reason.clone(), timestamp: request.timestamp.map(|ts| ts.into()), is_seen: request.is_seen, - actions: Arc::new(JoinRequestActions { inner: request }), + actions: Arc::new(KnockRequestActions { inner: request }), } } } /// A listener for receiving new requests to a join a room. #[matrix_sdk_ffi_macros::export(callback_interface)] -pub trait JoinRequestsListener: Send + Sync { - fn call(&self, join_requests: Vec); +pub trait KnockRequestsListener: Send + Sync { + fn call(&self, join_requests: Vec); } /// An FFI representation of a request to join a room. #[derive(Debug, Clone, uniffi::Record)] -pub struct JoinRequest { +pub struct KnockRequest { /// The event id of the event that contains the `knock` membership change. pub event_id: String, /// The user id of the user who's requesting to join the room. @@ -974,44 +974,44 @@ pub struct JoinRequest { pub reason: Option, /// The timestamp when this request was created. pub timestamp: Option, - /// Whether the request to join has been marked as `seen` so it can be + /// Whether the knock request has been marked as `seen` so it can be /// filtered by the client. pub is_seen: bool, - /// A set of actions to perform for this request to join. - pub actions: Arc, + /// A set of actions to perform for this knock request. + pub actions: Arc, } -/// A set of actions to perform for a request to join. +/// A set of actions to perform for a knock request. #[derive(Debug, Clone, uniffi::Object)] -pub struct JoinRequestActions { - inner: matrix_sdk::room::request_to_join::JoinRequest, +pub struct KnockRequestActions { + inner: matrix_sdk::room::knock_requests::KnockRequest, } #[matrix_sdk_ffi_macros::export] -impl JoinRequestActions { - /// Accepts the request to join by inviting the user to the room. +impl KnockRequestActions { + /// Accepts the knock request by inviting the user to the room. pub async fn accept(&self) -> Result<(), ClientError> { self.inner.accept().await.map_err(Into::into) } - /// Declines the request to join by kicking the user from the room with an + /// Declines the knock request by kicking the user from the room with an /// optional reason. pub async fn decline(&self, reason: Option) -> Result<(), ClientError> { self.inner.decline(reason.as_deref()).await.map_err(Into::into) } - /// Declines the request to join by banning the user from the room with an + /// Declines the knock request by banning the user from the room with an /// optional reason. pub async fn decline_and_ban(&self, reason: Option) -> Result<(), ClientError> { self.inner.decline_and_ban(reason.as_deref()).await.map_err(Into::into) } - /// Marks the request as 'seen'. + /// Marks the knock request as 'seen'. /// /// **IMPORTANT**: this won't update the current reference to this request, /// a new one with the updated value should be emitted instead. pub async fn mark_as_seen(&self) -> Result<(), ClientError> { - self.inner.clone().mark_as_seen().await.map_err(Into::into) + self.inner.mark_as_seen().await.map_err(Into::into) } } diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index 55d604591ba..a7a5d4ea983 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -1177,6 +1177,88 @@ impl Room { pub fn pinned_event_ids(&self) -> Option> { self.inner.read().pinned_event_ids() } + + /// Mark a list of requests to join the room as seen, given their state + /// event ids. + pub async fn mark_knock_requests_as_seen(&self, user_ids: &[OwnedUserId]) -> StoreResult<()> { + let raw_user_ids: Vec<&str> = user_ids.iter().map(|id| id.as_str()).collect(); + let member_raw_events = self + .store + .get_state_events_for_keys(self.room_id(), StateEventType::RoomMember, &raw_user_ids) + .await?; + let mut event_to_user_ids = Vec::with_capacity(member_raw_events.len()); + + // Map the list of events ids to their user ids, if they are event ids for knock + // membership events. Log an error and continue otherwise. + for raw_event in member_raw_events { + let event = raw_event.cast::().deserialize()?; + match event { + SyncOrStrippedState::Sync(SyncStateEvent::Original(event)) => { + if event.content.membership == MembershipState::Knock { + event_to_user_ids.push((event.event_id, event.state_key)) + } else { + warn!("Could not mark knock event as seen: event {} for user {} is not in Knock membership state.", event.event_id, event.state_key); + } + } + _ => warn!( + "Could not mark knock event as seen: event for user {} is not valid.", + event.state_key() + ), + } + } + + let mut current_seen_events_guard = self.seen_knock_request_ids_map.write().await; + // We're not calling `get_seen_join_request_ids` here because we need to keep + // the Mutex's guard until we've updated the data + let mut current_seen_events = if current_seen_events_guard.is_none() { + self.load_cached_knock_request_ids().await? + } else { + current_seen_events_guard.clone().unwrap() + }; + + current_seen_events.extend(event_to_user_ids); + + ObservableWriteGuard::set( + &mut current_seen_events_guard, + Some(current_seen_events.clone()), + ); + + self.store + .set_kv_data( + StateStoreDataKey::SeenKnockRequests(self.room_id()), + StateStoreDataValue::SeenKnockRequests(current_seen_events), + ) + .await?; + + Ok(()) + } + + /// Get the list of seen knock request event ids in this room. + pub async fn get_seen_knock_request_ids( + &self, + ) -> Result, StoreError> { + let mut guard = self.seen_knock_request_ids_map.write().await; + if guard.is_none() { + ObservableWriteGuard::set( + &mut guard, + Some(self.load_cached_knock_request_ids().await?), + ); + } + Ok(guard.clone().unwrap_or_default()) + } + + /// This loads the current list of seen knock request ids from the state + /// store. + async fn load_cached_knock_request_ids( + &self, + ) -> StoreResult> { + Ok(self + .store + .get_kv_data(StateStoreDataKey::SeenKnockRequests(self.room_id())) + .await? + .and_then(|v| v.into_seen_knock_requests()) + .unwrap_or_default()) + } } // See https://github.com/matrix-org/matrix-rust-sdk/pull/3749#issuecomment-2312939823. diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 4f98c42d4e8..9148c9b34da 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -232,7 +232,7 @@ impl StateStore for MemoryStore { inner.seen_knock_requests.insert( room_id.to_owned(), value - .into_seen_join_requests() + .into_seen_knock_requests() .expect("Session data is not a set of seen join request ids"), ); } diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index 8e2447d4866..5f651483f5b 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -1093,7 +1093,7 @@ impl StateStoreDataValue { } /// Get this value if it is the data for the ignored join requests. - pub fn into_seen_join_requests(self) -> Option> { + pub fn into_seen_knock_requests(self) -> Option> { as_variant!(self, Self::SeenKnockRequests) } } @@ -1126,7 +1126,7 @@ pub enum StateStoreDataKey<'a> { /// [`ComposerDraft`]: Self::ComposerDraft ComposerDraft(&'a RoomId), - /// A list of requests to join in a room marked as seen. + /// A list of knock request ids marked as seen in a room. SeenKnockRequests(&'a RoomId), } diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index 8de8d22efd0..01d386f354f 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -583,8 +583,8 @@ impl_state_store!({ ), StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value( &value - .into_seen_join_requests() - .expect("Session data is not a set of seen join request ids"), + .into_seen_knock_requests() + .expect("Session data is not a set of seen knock request ids"), ), }; diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index a8a2e792961..adfd9d5b5a3 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -1037,8 +1037,8 @@ impl StateStore for SqliteStateStore { )?, StateStoreDataKey::SeenKnockRequests(_) => self.serialize_value( &value - .into_seen_join_requests() - .expect("Session data is not a set of seen join request ids"), + .into_seen_knock_requests() + .expect("Session data is not a set of seen knock request ids"), )?, }; diff --git a/crates/matrix-sdk/src/room/request_to_join.rs b/crates/matrix-sdk/src/room/knock_requests.rs similarity index 65% rename from crates/matrix-sdk/src/room/request_to_join.rs rename to crates/matrix-sdk/src/room/knock_requests.rs index ff1b8dfe5af..de1409f5086 100644 --- a/crates/matrix-sdk/src/room/request_to_join.rs +++ b/crates/matrix-sdk/src/room/knock_requests.rs @@ -19,24 +19,24 @@ use crate::{room::RoomMember, Error, Room}; /// A request to join a room with `knock` join rule. #[derive(Debug, Clone)] -pub struct JoinRequest { +pub struct KnockRequest { room: Room, /// The event id of the event containing knock membership change. pub event_id: OwnedEventId, /// The timestamp when this request was created. pub timestamp: Option, /// Some general room member info to display. - pub member_info: RequestToJoinMemberInfo, + pub member_info: KnockRequestMemberInfo, /// Whether it's been marked as 'seen' by the client. pub is_seen: bool, } -impl JoinRequest { +impl KnockRequest { pub(crate) fn new( room: &Room, event_id: &EventId, timestamp: Option, - member: RequestToJoinMemberInfo, + member: KnockRequestMemberInfo, is_seen: bool, ) -> Self { Self { @@ -48,30 +48,30 @@ impl JoinRequest { } } - /// The room id for the `Room` form whose access is requested. + /// The room id for the `Room` from whose access is requested. pub fn room_id(&self) -> &RoomId { self.room.room_id() } - /// Marks the request to join as 'seen' so the client can ignore it in the + /// Marks the knock request as 'seen' so the client can ignore it in the /// future. pub async fn mark_as_seen(&self) -> Result<(), Error> { - self.room.mark_join_requests_as_seen(&[self.event_id.to_owned()]).await?; + self.room.mark_knock_requests_as_seen(&[self.member_info.user_id.to_owned()]).await?; Ok(()) } - /// Accepts the request to join by inviting the user to the room. + /// Accepts the knock request by inviting the user to the room. pub async fn accept(&self) -> Result<(), Error> { self.room.invite_user_by_id(&self.member_info.user_id).await } - /// Declines the request to join by kicking the user from the room, with an + /// Declines the knock request by kicking the user from the room, with an /// optional reason. pub async fn decline(&self, reason: Option<&str>) -> Result<(), Error> { self.room.kick_user(&self.member_info.user_id, reason).await } - /// Declines the request to join by banning the user from the room, with an + /// Declines the knock request by banning the user from the room, with an /// optional reason. pub async fn decline_and_ban(&self, reason: Option<&str>) -> Result<(), Error> { self.room.ban_user(&self.member_info.user_id, reason).await @@ -80,7 +80,7 @@ impl JoinRequest { /// General room member info to display along with the join request. #[derive(Debug, Clone)] -pub struct RequestToJoinMemberInfo { +pub struct KnockRequestMemberInfo { /// The user id for the room member requesting access. pub user_id: OwnedUserId, /// The optional display name of the room member requesting access. @@ -91,8 +91,8 @@ pub struct RequestToJoinMemberInfo { pub reason: Option, } -impl From for RequestToJoinMemberInfo { - fn from(member: RoomMember) -> Self { +impl KnockRequestMemberInfo { + pub(crate) fn from_member(member: &RoomMember) -> Self { Self { user_id: member.user_id().to_owned(), display_name: member.display_name().map(ToOwned::to_owned), @@ -102,13 +102,18 @@ impl From for RequestToJoinMemberInfo { } } +// The http mocking library is not supported for wasm32 #[cfg(all(test, not(target_arch = "wasm32")))] mod tests { - use matrix_sdk_test::async_test; - use ruma::{event_id, owned_user_id, room_id, EventId}; + use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder}; + use ruma::{ + event_id, + events::room::member::{MembershipState, RoomMemberEventContent}, + owned_user_id, room_id, user_id, EventId, + }; use crate::{ - room::request_to_join::{JoinRequest, RequestToJoinMemberInfo}, + room::knock_requests::{KnockRequest, KnockRequestMemberInfo}, test_utils::mocks::MatrixMockServer, Room, }; @@ -119,19 +124,31 @@ mod tests { let client = server.client_builder().build().await; let room_id = room_id!("!a:b.c"); let event_id = event_id!("$a:b.c"); + let user_id = user_id!("@alice:b.c"); - let room = server.sync_joined_room(&client, room_id).await; + let f = EventFactory::new().room(room_id); + let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f + .event(RoomMemberEventContent::new(MembershipState::Knock)) + .event_id(event_id) + .sender(user_id) + .state_key(user_id) + .into_raw_timeline() + .cast()]); + let room = server.sync_room(&client, joined_room_builder).await; - let join_request = mock_join_request(&room, Some(event_id)); + let knock_request = make_knock_request(&room, Some(event_id)); - // When we mark the join request as seen - join_request.mark_as_seen().await.expect("Failed to mark as seen"); + // When we mark the knock request as seen + knock_request.mark_as_seen().await.expect("Failed to mark as seen"); // Then we can check it was successfully marked as seen from the room let seen_ids = - room.get_seen_join_request_ids().await.expect("Failed to get seen join request ids"); + room.get_seen_knock_request_ids().await.expect("Failed to get seen join request ids"); assert_eq!(seen_ids.len(), 1); - assert_eq!(seen_ids.into_iter().next().expect("Couldn't load next item"), event_id); + assert_eq!( + seen_ids.into_iter().next().expect("Couldn't load next item"), + (event_id.to_owned(), user_id.to_owned()) + ); } #[async_test] @@ -142,13 +159,13 @@ mod tests { let room = server.sync_joined_room(&client, room_id).await; - let join_request = mock_join_request(&room, None); + let knock_request = make_knock_request(&room, None); // The /invite endpoint must be called once server.mock_invite_user_by_id().ok().mock_once().mount().await; - // When we accept the join request - join_request.accept().await.expect("Failed to accept the request"); + // When we accept the knock request + knock_request.accept().await.expect("Failed to accept the request"); } #[async_test] @@ -159,13 +176,13 @@ mod tests { let room = server.sync_joined_room(&client, room_id).await; - let join_request = mock_join_request(&room, None); + let knock_request = make_knock_request(&room, None); // The /kick endpoint must be called once server.mock_kick_user().ok().mock_once().mount().await; - // When we decline the join request - join_request.decline(None).await.expect("Failed to decline the request"); + // When we decline the knock request + knock_request.decline(None).await.expect("Failed to decline the request"); } #[async_test] @@ -176,24 +193,24 @@ mod tests { let room = server.sync_joined_room(&client, room_id).await; - let join_request = mock_join_request(&room, None); + let knock_request = make_knock_request(&room, None); // The /ban endpoint must be called once server.mock_ban_user().ok().mock_once().mount().await; - // When we decline the join request and ban the user from the room - join_request + // When we decline the knock request and ban the user from the room + knock_request .decline_and_ban(None) .await .expect("Failed to decline the request and ban the user"); } - fn mock_join_request(room: &Room, event_id: Option<&EventId>) -> JoinRequest { - JoinRequest::new( + fn make_knock_request(room: &Room, event_id: Option<&EventId>) -> KnockRequest { + KnockRequest::new( room, event_id.unwrap_or(event_id!("$a:b.c")), None, - RequestToJoinMemberInfo { + KnockRequestMemberInfo { user_id: owned_user_id!("@alice:b.c"), display_name: None, avatar_url: None, diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 48830fd33cb..30a3eed4de5 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -16,7 +16,7 @@ use std::{ borrow::Borrow, - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashMap}, ops::Deref, sync::Arc, time::Duration, @@ -139,8 +139,8 @@ use crate::{ media::{MediaFormat, MediaRequestParameters}, notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode}, room::{ + knock_requests::{KnockRequest, KnockRequestMemberInfo}, power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt}, - request_to_join::JoinRequest, }, sync::RoomUpdate, utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent}, @@ -152,11 +152,11 @@ use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::Backu pub mod edit; pub mod futures; pub mod identity_status_changes; +/// Contains code related to requests to join a room. +pub mod knock_requests; mod member; mod messages; pub mod power_levels; -/// Contains code related to requests to join a room. -pub mod request_to_join; /// A struct containing methods that are common for Joined, Invited and Left /// Rooms @@ -3214,103 +3214,103 @@ impl Room { ObservableLiveLocation::new(&self.client, self.room_id()) } - /// Helper to requests to join this `Room`. + /// Subscribe to knock requests in this `Room`. /// /// The current requests to join the room will be emitted immediately - /// when subscribing. When a new membership event is received, a request is - /// marked as seen or there is a limited sync, a new set of requests - /// will be emitted. - pub async fn subscribe_to_join_requests(&self) -> Result>> { + /// when subscribing. + /// + /// A new set of knock requests will be emitted whenever: + /// - A new member event is received. + /// - A knock request is marked as seen. + /// - A sync is gappy (limited), so room membership information may be + /// outdated. + pub async fn subscribe_to_knock_requests( + &self, + ) -> Result>> { let this = Arc::new(self.clone()); - let requests_observable = - this.client.observe_room_events::(this.room_id()); + let room_member_events_observer = + self.client.observe_room_events::(this.room_id()); - let (current_seen_ids, mut seen_request_ids_stream) = - this.subscribe_to_seen_join_request_ids().await?; + let current_seen_ids = self.get_seen_knock_request_ids().await?; + let mut seen_request_ids_stream = self + .seen_knock_request_ids_map + .subscribe() + .await + .map(|values| values.unwrap_or_default()); - let mut room_info_stream = this.subscribe_info(); + let mut room_info_stream = self.subscribe_info(); let combined_stream = stream! { // Emit current requests to join - match this.clone().get_current_join_requests(¤t_seen_ids).await { + match this.get_current_join_requests(¤t_seen_ids).await { Ok(initial_requests) => yield initial_requests, - Err(e) => warn!("Failed to get initial requests to join: {e:?}") + Err(err) => warn!("Failed to get initial requests to join: {err}") } - let mut requests_stream = requests_observable.subscribe(); - - let mut new_event: Option = None; + let mut requests_stream = room_member_events_observer.subscribe(); let mut seen_ids = current_seen_ids.clone(); - let mut prev_seen_ids = current_seen_ids; - let mut prev_missing_room_members: bool = false; - let mut missing_room_members: bool = false; loop { // This is equivalent to a combine stream operation, triggering a new emission // when any of the branches changes tokio::select! { - Some((next, _)) = requests_stream.next() => { new_event = Some(next); } - Some(next) = seen_request_ids_stream.next() => { seen_ids = next; } - Some(next) = room_info_stream.next() => { - missing_room_members = !next.are_members_synced() - } - else => break, - } - - // We need to emit new items when we may have missing room members: - // this usually happens after a gappy (limited) sync - let has_missing_room_members = prev_missing_room_members != missing_room_members; - if has_missing_room_members { - prev_missing_room_members = missing_room_members; - } - - // We need to emit new items if the seen join request ids have changed - let has_new_seen_ids = prev_seen_ids != seen_ids; - if has_new_seen_ids { - prev_seen_ids = seen_ids.clone(); - } - - if let Some(SyncStateEvent::Original(event)) = new_event.clone() { - // Reset the new event value so we can check this again in the next loop - new_event = None; - - // If we can calculate the membership change, try to emit only when needed - if event.prev_content().is_some() { - match event.membership_change() { - MembershipChange::Banned | - MembershipChange::Knocked | - MembershipChange::KnockAccepted | - MembershipChange::KnockDenied | - MembershipChange::KnockRetracted => { - match this.clone().get_current_join_requests(&seen_ids).await { + Some((event, _)) = requests_stream.next() => { + if let Some(event) = event.as_original() { + // If we can calculate the membership change, try to emit only when needed + let emit = if event.prev_content().is_some() { + matches!(event.membership_change(), + MembershipChange::Banned | + MembershipChange::Knocked | + MembershipChange::KnockAccepted | + MembershipChange::KnockDenied | + MembershipChange::KnockRetracted + ) + } else { + // If we can't calculate the membership change, assume we need to + // emit updated values + true + }; + + if emit { + match this.get_current_join_requests(&seen_ids).await { Ok(requests) => yield requests, - Err(e) => { - warn!("Failed to get updated requests to join on membership change: {e:?}") + Err(err) => { + warn!("Failed to get updated knock requests on new member event: {err}") } } } - _ => (), } - } else { - // If we can't calculate the membership change, assume we need to - // emit updated values - match this.clone().get_current_join_requests(&seen_ids).await { + } + + Some(new_seen_ids) = seen_request_ids_stream.next() => { + // Update the current seen ids + seen_ids = new_seen_ids; + + // If seen requests have changed we need to recalculate + // all the knock requests + match this.get_current_join_requests(&seen_ids).await { Ok(requests) => yield requests, - Err(e) => { - warn!("Failed to get updated requests to join on new member event: {e:?}") + Err(err) => { + warn!("Failed to get updated knock requests on seen ids changed: {err}") } } } - } else if has_new_seen_ids || has_missing_room_members { - // If seen requests have changed or we have missing room members, - // we need to recalculate all the requests to join - match this.clone().get_current_join_requests(&seen_ids).await { - Ok(requests) => yield requests, - Err(e) => { - warn!("Failed to get updated requests to join on seen ids changed: {e:?}") + + Some(room_info) = room_info_stream.next() => { + // We need to emit new items when we may have missing room members: + // this usually happens after a gappy (limited) sync + if !room_info.are_members_synced() { + match this.get_current_join_requests(&seen_ids).await { + Ok(requests) => yield requests, + Err(err) => { + warn!("Failed to get updated knock requests on gappy (limited) sync: {err}") + } + } } } + // If the streams in all branches are closed, stop the loop + else => break, } } }; @@ -3320,81 +3320,24 @@ impl Room { async fn get_current_join_requests( &self, - seen_request_ids: &HashSet, - ) -> Result> { + seen_request_ids: &BTreeMap, + ) -> Result> { Ok(self .members(RoomMemberships::KNOCK) .await? .into_iter() .filter_map(|member| { - if let Some(event_id) = member.event().event_id() { - let event_id = event_id.to_owned(); - Some(JoinRequest::new( - self, - &event_id, - member.event().timestamp(), - member.into(), - seen_request_ids.contains(&event_id), - )) - } else { - None - } + let event_id = member.event().event_id()?; + Some(KnockRequest::new( + self, + event_id, + member.event().timestamp(), + KnockRequestMemberInfo::from_member(&member), + seen_request_ids.contains_key(event_id), + )) }) .collect()) } - - /// Mark a list of requests to join the room as seen, given their state - /// event ids. - pub async fn mark_join_requests_as_seen(&self, event_ids: &[OwnedEventId]) -> Result<()> { - let mut current_seen_events = self.get_seen_join_request_ids().await?; - - for event_id in event_ids { - current_seen_events.insert(event_id.to_owned()); - } - - self.seen_join_request_ids.set(Some(current_seen_events.clone())); - - self.client - .store() - .set_kv_data( - StateStoreDataKey::SeenJoinRequests(self.room_id()), - StateStoreDataValue::SeenJoinRequests(current_seen_events), - ) - .await - .map_err(Into::into) - } - - /// Get the list of seen requests to join event ids in this room. - pub async fn get_seen_join_request_ids(&self) -> Result> { - let current_join_request_ids = self.seen_join_request_ids.get(); - let current_join_request_ids: HashSet = - if let Some(requests) = current_join_request_ids.as_ref() { - requests.clone() - } else { - let requests = self - .client - .store() - .get_kv_data(StateStoreDataKey::SeenJoinRequests(self.room_id())) - .await? - .and_then(|v| v.into_seen_join_requests()) - .unwrap_or_default(); - - self.seen_join_request_ids.set(Some(requests.clone())); - requests - }; - Ok(current_join_request_ids) - } - - /// Subscribes to the set of requests to join that have been marked as - /// 'seen'. - pub async fn subscribe_to_seen_join_request_ids( - &self, - ) -> Result<(HashSet, impl Stream>)> { - let current = self.get_seen_join_request_ids().await?; - let subscriber = - self.seen_join_request_ids.subscribe().map(|values| values.unwrap_or_default()); - Ok((current, subscriber)) - } } #[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))] @@ -3684,10 +3627,14 @@ pub struct TryFromReportedContentScoreError(()); mod tests { use matrix_sdk_base::{store::ComposerDraftType, ComposerDraft, SessionMeta}; use matrix_sdk_test::{ - async_test, test_json, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, - DEFAULT_TEST_ROOM_ID, + async_test, event_factory::EventFactory, test_json, JoinedRoomBuilder, StateTestEvent, + SyncResponseBuilder, + }; + use ruma::{ + device_id, event_id, + events::room::member::{MembershipState, RoomMemberEventContent}, + int, room_id, user_id, }; - use ruma::{device_id, event_id, int, user_id}; use wiremock::{ matchers::{header, method, path_regex}, Mock, MockServer, ResponseTemplate, @@ -3878,23 +3825,36 @@ mod tests { let server = MatrixMockServer::new().await; let client = server.client_builder().build().await; let event_id = event_id!("$a:b.c"); - - let room = server.sync_joined_room(&client, &DEFAULT_TEST_ROOM_ID).await; + let room_id = room_id!("!a:b.c"); + let user_id = user_id!("@alice:b.c"); + + let f = EventFactory::new().room(room_id); + let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f + .event(RoomMemberEventContent::new(MembershipState::Knock)) + .event_id(event_id) + .sender(user_id) + .state_key(user_id) + .into_raw_timeline() + .cast()]); + let room = server.sync_room(&client, joined_room_builder).await; // When loading the initial seen ids, there are none let seen_ids = - room.get_seen_join_request_ids().await.expect("Couldn't load seen join request ids"); + room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids"); assert!(seen_ids.is_empty()); // We mark a random event id as seen - room.mark_join_requests_as_seen(&[event_id.to_owned()]) + room.mark_knock_requests_as_seen(&[user_id.to_owned()]) .await .expect("Couldn't mark join request as seen"); // Then we can check it was successfully marked as seen let seen_ids = - room.get_seen_join_request_ids().await.expect("Couldn't load seen join request ids"); + room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids"); assert_eq!(seen_ids.len(), 1); - assert_eq!(seen_ids.into_iter().next().expect("No next value"), event_id) + assert_eq!( + seen_ids.into_iter().next().expect("No next value"), + (event_id.to_owned(), user_id.to_owned()) + ) } } diff --git a/crates/matrix-sdk/tests/integration/room/joined.rs b/crates/matrix-sdk/tests/integration/room/joined.rs index a6b5f38949e..4e6ca00de93 100644 --- a/crates/matrix-sdk/tests/integration/room/joined.rs +++ b/crates/matrix-sdk/tests/integration/room/joined.rs @@ -34,6 +34,7 @@ use ruma::{ int, mxc_uri, owned_event_id, room_id, thirdparty, user_id, OwnedUserId, TransactionId, }; use serde_json::{from_value, json, Value}; +use stream_assert::assert_pending; use wiremock::{ matchers::{body_json, body_partial_json, header, method, path_regex}, Mock, ResponseTemplate, @@ -848,55 +849,56 @@ async fn test_subscribe_to_requests_to_join() { let room_id = room_id!("!a:b.c"); let f = EventFactory::new().room(room_id); - let alice_user_id = user_id!("@alice:b.c"); - let alice_knock_event_id = event_id!("$alice-knock:b.c"); - let alice_knock_event = f + let user_id = user_id!("@alice:b.c"); + let knock_event_id = event_id!("$alice-knock:b.c"); + let knock_event = f .event(RoomMemberEventContent::new(MembershipState::Knock)) - .event_id(alice_knock_event_id) - .sender(alice_user_id) - .state_key(alice_user_id) + .event_id(knock_event_id) + .sender(user_id) + .state_key(user_id) .into_raw_timeline() .cast(); - server.mock_get_members().ok(vec![alice_knock_event]).mock_once().mount().await; + server.mock_get_members().ok(vec![knock_event]).mock_once().mount().await; let room = server.sync_joined_room(&client, room_id).await; - let stream = room.subscribe_to_join_requests().await.unwrap(); + let stream = room.subscribe_to_knock_requests().await.unwrap(); pin_mut!(stream); - // We receive an initial request to join from Alice + // We receive an initial knock request from Alice let initial = assert_next_with_timeout!(stream, 100); - assert!(!initial.is_empty()); + assert_eq!(initial.len(), 1); - let alices_request_to_join = &initial[0]; - assert_eq!(alices_request_to_join.event_id, alice_knock_event_id); - assert!(!alices_request_to_join.is_seen); + let knock_request = &initial[0]; + assert_eq!(knock_request.event_id, knock_event_id); + assert!(!knock_request.is_seen); - // We then mark the request to join as seen - room.mark_join_requests_as_seen(&[alice_knock_event_id.to_owned()]).await.unwrap(); + // We then mark the knock request as seen + room.mark_knock_requests_as_seen(&[user_id.to_owned()]).await.unwrap(); // Now it's received again as seen let seen = assert_next_with_timeout!(stream, 100); - assert!(!seen.is_empty()); - let alices_seen_request_to_join = &seen[0]; - assert_eq!(alices_seen_request_to_join.event_id, alice_knock_event_id); - assert!(alices_seen_request_to_join.is_seen); + assert_eq!(initial.len(), 1); + let seen_knock = &seen[0]; + assert_eq!(seen_knock.event_id, knock_event_id); + assert!(seen_knock.is_seen); // If we then receive a new member event for Alice that's not 'knock' - let alice_join_event_id = event_id!("$alice-join:b.c"); let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f .event(RoomMemberEventContent::new(MembershipState::Invite)) - .event_id(alice_join_event_id) - .sender(alice_user_id) - .state_key(alice_user_id) + .sender(user_id) + .state_key(user_id) .into_raw_timeline() .cast()]); server.sync_room(&client, joined_room_builder).await; - // The requests to join are now empty + // The knock requests are now empty let updated_requests = assert_next_with_timeout!(stream, 100); assert!(updated_requests.is_empty()); + + // There should be no other knock requests + assert_pending!(stream) } #[async_test] @@ -909,19 +911,17 @@ async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() { let room_id = room_id!("!a:b.c"); let f = EventFactory::new().room(room_id); - let alice_user_id = user_id!("@alice:b.c"); - let alice_knock_event_id = event_id!("$alice-knock:b.c"); - let alice_knock_event = f + let user_id = user_id!("@alice:b.c"); + let knock_event = f .event(RoomMemberEventContent::new(MembershipState::Knock)) - .event_id(alice_knock_event_id) - .sender(alice_user_id) - .state_key(alice_user_id) + .sender(user_id) + .state_key(user_id) .into_raw_timeline() .cast(); server .mock_get_members() - .ok(vec![alice_knock_event]) + .ok(vec![knock_event]) // The endpoint will be called twice: // 1. For the initial loading of room members. // 2. When a gappy (limited) sync is received. @@ -930,18 +930,21 @@ async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() { .await; let room = server.sync_joined_room(&client, room_id).await; - let stream = room.subscribe_to_join_requests().await.unwrap(); + let stream = room.subscribe_to_knock_requests().await.unwrap(); pin_mut!(stream); - // We receive an initial request to join from Alice + // We receive an initial knock request from Alice let initial = assert_next_with_timeout!(stream, 500); assert!(!initial.is_empty()); - // This limited sync should trigger a new emission of join requests, with a + // This limited sync should trigger a new emission of knock requests, with a // reloading of the room members server.sync_room(&client, JoinedRoomBuilder::new(room_id).set_timeline_limited()).await; - // We should receive a new list of join requests + // We should receive a new list of knock requests assert_next_with_timeout!(stream, 500); + + // There should be no other knock requests + assert_pending!(stream) }