Skip to content

Commit

Permalink
feat(room): create a cleanup task in Room::subscribe_to_knock_requests
Browse files Browse the repository at this point in the history
This cleanup task will run while the knock request subscription runs and will use the `Room::room_member_updates_sender` notification to call `Room::remove_outdated_seen_knock_requests_ids` and remove outdated seen knock request ids automatically.
  • Loading branch information
jmartinesp committed Dec 18, 2024
1 parent 29257c2 commit 06c4332
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 16 deletions.
4 changes: 3 additions & 1 deletion bindings/matrix-sdk-ffi/src/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,13 +924,15 @@ impl Room {
self: Arc<Self>,
listener: Box<dyn KnockRequestsListener>,
) -> Result<Arc<TaskHandle>, ClientError> {
let stream = self.inner.subscribe_to_knock_requests().await?;
let (stream, seen_ids_cleanup_handle) = self.inner.subscribe_to_knock_requests().await?;

let handle = Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
pin_mut!(stream);
while let Some(requests) = stream.next().await {
listener.call(requests.into_iter().map(Into::into).collect());
}
// Cancel the seen ids cleanup task
seen_ids_cleanup_handle.abort();
})));

Ok(handle)
Expand Down
28 changes: 25 additions & 3 deletions crates/matrix-sdk/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ use matrix_sdk_base::{
ComposerDraft, RoomInfoNotableUpdateReasons, RoomMemberships, StateChanges, StateStoreDataKey,
StateStoreDataValue,
};
use matrix_sdk_common::{deserialized_responses::SyncTimelineEvent, timeout::timeout};
use matrix_sdk_common::{
deserialized_responses::SyncTimelineEvent,
executor::{spawn, JoinHandle},
timeout::timeout,
};
use mime::Mime;
#[cfg(feature = "e2e-encryption")]
use ruma::events::{
Expand Down Expand Up @@ -3224,9 +3228,12 @@ impl Room {
/// - A knock request is marked as seen.
/// - A sync is gappy (limited), so room membership information may be
/// outdated.
///
/// Returns both a stream of knock requests and a handle for a task that
/// will clean up the seen knock request ids when possible.
pub async fn subscribe_to_knock_requests(
&self,
) -> Result<impl Stream<Item = Vec<KnockRequest>>> {
) -> Result<(impl Stream<Item = Vec<KnockRequest>>, JoinHandle<()>)> {
let this = Arc::new(self.clone());

let room_member_events_observer =
Expand All @@ -3241,6 +3248,21 @@ impl Room {

let mut room_info_stream = self.subscribe_info();

// Spawn a task that will clean up the seen knock request ids when updated room
// members are received
let clear_seen_ids_handle = spawn({
let this = self.clone();
async move {
let mut member_updates_stream = this.room_member_updates_sender.subscribe();
while member_updates_stream.recv().await.is_ok() {
// If room members were updated, try to remove outdated seen knock request ids
if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
warn!("Failed to remove seen knock requests: {err}")
}
}
}
});

let combined_stream = stream! {
// Emit current requests to join
match this.get_current_join_requests(&current_seen_ids).await {
Expand Down Expand Up @@ -3315,7 +3337,7 @@ impl Room {
}
};

Ok(combined_stream)
Ok((combined_stream, clear_seen_ids_handle))
}

async fn get_current_join_requests(
Expand Down
127 changes: 115 additions & 12 deletions crates/matrix-sdk/tests/integration/room/joined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use ruma::{
};
use serde_json::{from_value, json, Value};
use stream_assert::assert_pending;
use tokio::time::sleep;
use wiremock::{
matchers::{body_json, body_partial_json, header, method, path_regex},
Mock, ResponseTemplate,
Expand Down Expand Up @@ -840,7 +841,7 @@ async fn test_enable_encryption_doesnt_stay_unencrypted() {
}

#[async_test]
async fn test_subscribe_to_requests_to_join() {
async fn test_subscribe_to_knock_requests() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;

Expand All @@ -862,7 +863,7 @@ async fn test_subscribe_to_requests_to_join() {
server.mock_get_members().ok(vec![knock_event]).mock_once().mount().await;

let room = server.sync_joined_room(&client, room_id).await;
let stream = room.subscribe_to_knock_requests().await.unwrap();
let (stream, handle) = room.subscribe_to_knock_requests().await.unwrap();

pin_mut!(stream);

Expand Down Expand Up @@ -893,16 +894,30 @@ async fn test_subscribe_to_requests_to_join() {
.cast()]);
server.sync_room(&client, joined_room_builder).await;

// The knock requests are now empty
// The knock requests are now empty because we have new member events
let updated_requests = assert_next_with_timeout!(stream, 100);
assert!(updated_requests.is_empty());

// And it's emitted again because the seen id value has changed
let updated_requests = assert_next_with_timeout!(stream, 100);
assert!(updated_requests.is_empty());

// There should be no other knock requests
assert_pending!(stream)
assert_pending!(stream);

// The seen knock request id is no longer there because the associated knock
// request doesn't exist anymore
let seen_knock_request_ids = room
.get_seen_knock_request_ids()
.await
.expect("could not get current seen knock request ids");
assert!(seen_knock_request_ids.is_empty());

handle.abort();
}

#[async_test]
async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() {
async fn test_subscribe_to_knock_requests_reloads_members_on_limited_sync() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;

Expand Down Expand Up @@ -930,7 +945,7 @@ async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() {
.await;

let room = server.sync_joined_room(&client, room_id).await;
let stream = room.subscribe_to_knock_requests().await.unwrap();
let (stream, handle) = room.subscribe_to_knock_requests().await.unwrap();

pin_mut!(stream);

Expand All @@ -946,7 +961,6 @@ async fn test_subscribe_to_requests_to_join_reloads_members_on_limited_sync() {
assert_next_with_timeout!(stream, 500);

// There should be no other knock requests
assert_pending!(stream)
assert_pending!(stream);

handle.abort();
Expand All @@ -973,7 +987,9 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_membership_changed()
.cast();

// When syncing the room, we'll have a knock request coming from alice
let room = server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event])).await;
let room = server
.sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event]))
.await;

// We then mark the knock request as seen
room.mark_knock_requests_as_seen(&[user_id.to_owned()]).await.unwrap();
Expand All @@ -997,7 +1013,9 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_membership_changed()
room.sync_members().await.expect("could not reload room members");

// Calling remove outdated seen knock request ids will remove the seen id
room.remove_outdated_seen_knock_requests_ids().await.expect("could not remove outdated seen knock request ids");
room.remove_outdated_seen_knock_requests_ids()
.await
.expect("could not remove outdated seen knock request ids");

let seen = room.get_seen_knock_request_ids().await.unwrap();
assert!(seen.is_empty());
Expand All @@ -1024,7 +1042,9 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_we_have_an_outdated_k
.cast();

// When syncing the room, we'll have a knock request coming from alice
let room = server.sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event])).await;
let room = server
.sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk(vec![knock_event]))
.await;

// We then mark the knock request as seen
room.mark_knock_requests_as_seen(&[user_id.to_owned()]).await.unwrap();
Expand All @@ -1033,7 +1053,8 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_we_have_an_outdated_k
let seen = room.get_seen_knock_request_ids().await.unwrap();
assert_eq!(seen.len(), 1);

// If we then load the members again and the previously knocking member has a different event id
// If we then load the members again and the previously knocking member has a
// different event id
let knock_event = f
.event(RoomMemberEventContent::new(MembershipState::Knock))
.event_id(event_id!("$knock-2:b.c"))
Expand All @@ -1048,8 +1069,90 @@ async fn test_remove_outdated_seen_knock_requests_ids_when_we_have_an_outdated_k
room.sync_members().await.expect("could not reload room members");

// Calling remove outdated seen knock request ids will remove the seen id
room.remove_outdated_seen_knock_requests_ids().await.expect("could not remove outdated seen knock request ids");
room.remove_outdated_seen_knock_requests_ids()
.await
.expect("could not remove outdated seen knock request ids");

let seen = room.get_seen_knock_request_ids().await.unwrap();
assert!(seen.is_empty());
}

#[async_test]
async fn test_subscribe_to_knock_requests_clears_seen_ids_on_member_reload() {
let server = MatrixMockServer::new().await;
let client = server.client_builder().build().await;

server.mock_room_state_encryption().plain().mount().await;

let room_id = room_id!("!a:b.c");
let f = EventFactory::new().room(room_id);

let user_id = user_id!("@alice:b.c");
let knock_event_id = event_id!("$alice-knock:b.c");
let knock_event = f
.event(RoomMemberEventContent::new(MembershipState::Knock))
.event_id(knock_event_id)
.sender(user_id)
.state_key(user_id)
.into_raw_timeline()
.cast();

server.mock_get_members().ok(vec![knock_event]).mock_once().mount().await;

let room = server.sync_joined_room(&client, room_id).await;
let (stream, handle) = room.subscribe_to_knock_requests().await.unwrap();

pin_mut!(stream);

// We receive an initial knock request from Alice
let initial = assert_next_with_timeout!(stream, 100);
assert_eq!(initial.len(), 1);

let knock_request = &initial[0];
assert_eq!(knock_request.event_id, knock_event_id);
assert!(!knock_request.is_seen);

// We then mark the knock request as seen
room.mark_knock_requests_as_seen(&[user_id.to_owned()]).await.unwrap();

// Now it's received again as seen
let seen = assert_next_with_timeout!(stream, 100);
assert_eq!(seen.len(), 1);
let seen_knock = &seen[0];
assert_eq!(seen_knock.event_id, knock_event_id);
assert!(seen_knock.is_seen);

// If we then load the members again and the previously knocking member is in
// another state now
let joined_event = f
.event(RoomMemberEventContent::new(MembershipState::Join))
.sender(user_id)
.state_key(user_id)
.into_raw_timeline()
.cast();

server.mock_get_members().ok(vec![joined_event]).mock_once().mount().await;

room.mark_members_missing();
room.sync_members().await.expect("could not reload room members");

// The knock requests are now empty because we have new member events
let updated_requests = assert_next_with_timeout!(stream, 100);
assert!(updated_requests.is_empty());

// There should be no other knock requests
assert_pending!(stream);

// Give some time for the seen ids purging to be done
sleep(Duration::from_millis(100)).await;

// The seen knock request id is no longer there because the associated knock
// request doesn't exist anymore
let seen_knock_request_ids = room
.get_seen_knock_request_ids()
.await
.expect("could not get current seen knock request ids");
assert!(seen_knock_request_ids.is_empty());

handle.abort();
}

0 comments on commit 06c4332

Please sign in to comment.