diff --git a/bindings/matrix-sdk-ffi/src/room.rs b/bindings/matrix-sdk-ffi/src/room.rs index 1402d5d986..3cef3ca8e6 100644 --- a/bindings/matrix-sdk-ffi/src/room.rs +++ b/bindings/matrix-sdk-ffi/src/room.rs @@ -924,13 +924,15 @@ impl Room { self: Arc, listener: Box, ) -> Result, ClientError> { - let stream = self.inner.subscribe_to_knock_requests().await?; + let (stream, seen_ids_cleanup_handle) = self.inner.subscribe_to_knock_requests().await?; let handle = Arc::new(TaskHandle::new(RUNTIME.spawn(async move { pin_mut!(stream); while let Some(requests) = stream.next().await { listener.call(requests.into_iter().map(Into::into).collect()); } + // Cancel the seen ids cleanup task + seen_ids_cleanup_handle.abort(); }))); Ok(handle) diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 6ee2995822..a4a089652a 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -47,7 +47,11 @@ use matrix_sdk_base::{ ComposerDraft, RoomInfoNotableUpdateReasons, RoomMemberships, StateChanges, StateStoreDataKey, StateStoreDataValue, }; -use matrix_sdk_common::{deserialized_responses::SyncTimelineEvent, timeout::timeout}; +use matrix_sdk_common::{ + deserialized_responses::SyncTimelineEvent, + executor::{spawn, JoinHandle}, + timeout::timeout, +}; use mime::Mime; #[cfg(feature = "e2e-encryption")] use ruma::events::{ @@ -3224,9 +3228,12 @@ impl Room { /// - A knock request is marked as seen. /// - A sync is gappy (limited), so room membership information may be /// outdated. + /// + /// Returns both a stream of knock requests and a handle for a task that + /// will clean up the seen knock request ids when possible. pub async fn subscribe_to_knock_requests( &self, - ) -> Result>> { + ) -> Result<(impl Stream>, JoinHandle<()>)> { let this = Arc::new(self.clone()); let room_member_events_observer = @@ -3241,6 +3248,21 @@ impl Room { let mut room_info_stream = self.subscribe_info(); + // Spawn a task that will clean up the seen knock request ids when updated room + // members are received + let clear_seen_ids_handle = spawn({ + let this = self.clone(); + async move { + let mut member_updates_stream = this.room_member_updates_sender.subscribe(); + while member_updates_stream.recv().await.is_ok() { + // If room members were updated, try to remove outdated seen knock request ids + if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await { + warn!("Failed to remove seen knock requests: {err}") + } + } + } + }); + let combined_stream = stream! { // Emit current requests to join match this.get_current_join_requests(¤t_seen_ids).await { @@ -3315,7 +3337,7 @@ impl Room { } }; - Ok(combined_stream) + Ok((combined_stream, clear_seen_ids_handle)) } async fn get_current_join_requests( diff --git a/crates/matrix-sdk/tests/integration/room/joined.rs b/crates/matrix-sdk/tests/integration/room/joined.rs index 76f9b38a03..e024c3567d 100644 --- a/crates/matrix-sdk/tests/integration/room/joined.rs +++ b/crates/matrix-sdk/tests/integration/room/joined.rs @@ -35,6 +35,7 @@ use ruma::{ }; use serde_json::{from_value, json, Value}; use stream_assert::assert_pending; +use tokio::time::sleep; use wiremock::{ matchers::{body_json, body_partial_json, header, method, path_regex}, Mock, ResponseTemplate, @@ -840,7 +841,7 @@ async fn test_enable_encryption_doesnt_stay_unencrypted() { } #[async_test] -async fn test_subscribe_to_requests_to_join() { +async fn test_subscribe_to_knock_requests() { let server = MatrixMockServer::new().await; let client = server.client_builder().build().await; @@ -862,7 +863,7 @@ async fn test_subscribe_to_requests_to_join() { server.mock_get_members().ok(vec![knock_event]).mock_once().mount().await; let room = server.sync_joined_room(&client, room_id).await; - let stream = room.subscribe_to_knock_requests().await.unwrap(); + let (stream, handle) = room.subscribe_to_knock_requests().await.unwrap(); pin_mut!(stream); @@ -893,16 +894,30 @@ async fn test_subscribe_to_requests_to_join() { .cast()]); server.sync_room(&client, joined_room_builder).await; - // The knock requests are now empty + // The knock requests are now empty because we have new member events + let updated_requests = assert_next_with_timeout!(stream, 100); + assert!(updated_requests.is_empty()); + + // And it's emitted again because the seen id value has changed let updated_requests = assert_next_with_timeout!(stream, 100); assert!(updated_requests.is_empty()); // There should be no other knock requests - assert_pending!(stream) + assert_pending!(stream); + + // The seen knock request id is no longer there because the associated knock + // request doesn't exist anymore + let seen_knock_request_ids = room + .get_seen_knock_request_ids() + .await + .expect("could not get current seen knock request ids"); + assert!(seen_knock_request_ids.is_empty()); + + handle.abort(); } #[async_test] -async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() { +async fn test_subscribe_to_knock_requests_reloads_members_on_limited_sync() { let server = MatrixMockServer::new().await; let client = server.client_builder().build().await; @@ -930,7 +945,7 @@ async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() { .await; let room = server.sync_joined_room(&client, room_id).await; - let stream = room.subscribe_to_knock_requests().await.unwrap(); + let (stream, handle) = room.subscribe_to_knock_requests().await.unwrap(); pin_mut!(stream); @@ -946,7 +961,6 @@ async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() { assert_next_with_timeout!(stream, 500); // There should be no other knock requests - assert_pending!(stream) assert_pending!(stream); handle.abort(); @@ -973,7 +987,9 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_membership_changed() .cast(); // When syncing the room, we'll have a knock request coming from alice - let room = server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event])).await; + let room = server + .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event])) + .await; // We then mark the knock request as seen room.mark_knock_requests_as_seen(&[user_id.to_owned()]).await.unwrap(); @@ -997,7 +1013,9 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_membership_changed() room.sync_members().await.expect("could not reload room members"); // Calling remove outdated seen knock request ids will remove the seen id - room.remove_outdated_seen_knock_requests_ids().await.expect("could not remove outdated seen knock request ids"); + room.remove_outdated_seen_knock_requests_ids() + .await + .expect("could not remove outdated seen knock request ids"); let seen = room.get_seen_knock_request_ids().await.unwrap(); assert!(seen.is_empty()); @@ -1024,7 +1042,9 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_we_have_an_outdated_k .cast(); // When syncing the room, we'll have a knock request coming from alice - let room = server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event])).await; + let room = server + .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event])) + .await; // We then mark the knock request as seen room.mark_knock_requests_as_seen(&[user_id.to_owned()]).await.unwrap(); @@ -1033,7 +1053,8 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_we_have_an_outdated_k let seen = room.get_seen_knock_request_ids().await.unwrap(); assert_eq!(seen.len(), 1); - // If we then load the members again and the previously knocking member has a different event id + // If we then load the members again and the previously knocking member has a + // different event id let knock_event = f .event(RoomMemberEventContent::new(MembershipState::Knock)) .event_id(event_id!("$knock-2:b.c")) @@ -1048,8 +1069,90 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_we_have_an_outdated_k room.sync_members().await.expect("could not reload room members"); // Calling remove outdated seen knock request ids will remove the seen id - room.remove_outdated_seen_knock_requests_ids().await.expect("could not remove outdated seen knock request ids"); + room.remove_outdated_seen_knock_requests_ids() + .await + .expect("could not remove outdated seen knock request ids"); let seen = room.get_seen_knock_request_ids().await.unwrap(); assert!(seen.is_empty()); } + +#[async_test] +async fn test_subscribe_to_knock_requests_clears_seen_ids_on_member_reload() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + server.mock_room_state_encryption().plain().mount().await; + + let room_id = room_id!("!a:b.c"); + let f = EventFactory::new().room(room_id); + + let user_id = user_id!("@alice:b.c"); + let knock_event_id = event_id!("$alice-knock:b.c"); + let knock_event = f + .event(RoomMemberEventContent::new(MembershipState::Knock)) + .event_id(knock_event_id) + .sender(user_id) + .state_key(user_id) + .into_raw_timeline() + .cast(); + + server.mock_get_members().ok(vec![knock_event]).mock_once().mount().await; + + let room = server.sync_joined_room(&client, room_id).await; + let (stream, handle) = room.subscribe_to_knock_requests().await.unwrap(); + + pin_mut!(stream); + + // We receive an initial knock request from Alice + let initial = assert_next_with_timeout!(stream, 100); + assert_eq!(initial.len(), 1); + + let knock_request = &initial[0]; + assert_eq!(knock_request.event_id, knock_event_id); + assert!(!knock_request.is_seen); + + // We then mark the knock request as seen + room.mark_knock_requests_as_seen(&[user_id.to_owned()]).await.unwrap(); + + // Now it's received again as seen + let seen = assert_next_with_timeout!(stream, 100); + assert_eq!(seen.len(), 1); + let seen_knock = &seen[0]; + assert_eq!(seen_knock.event_id, knock_event_id); + assert!(seen_knock.is_seen); + + // If we then load the members again and the previously knocking member is in + // another state now + let joined_event = f + .event(RoomMemberEventContent::new(MembershipState::Join)) + .sender(user_id) + .state_key(user_id) + .into_raw_timeline() + .cast(); + + server.mock_get_members().ok(vec![joined_event]).mock_once().mount().await; + + room.mark_members_missing(); + room.sync_members().await.expect("could not reload room members"); + + // The knock requests are now empty because we have new member events + let updated_requests = assert_next_with_timeout!(stream, 100); + assert!(updated_requests.is_empty()); + + // There should be no other knock requests + assert_pending!(stream); + + // Give some time for the seen ids purging to be done + sleep(Duration::from_millis(100)).await; + + // The seen knock request id is no longer there because the associated knock + // request doesn't exist anymore + let seen_knock_request_ids = room + .get_seen_knock_request_ids() + .await + .expect("could not get current seen knock request ids"); + assert!(seen_knock_request_ids.is_empty()); + + handle.abort(); +}