From 8f0beb98bb649d768eb6a94a408159fccabc8ff0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jorge=20Mart=C3=ADn?= Date: Wed, 27 Nov 2024 13:18:07 +0100 Subject: [PATCH] feat(room): allow subscribing to requests to join a room --- bindings/matrix-sdk-ffi/src/room.rs | 93 +++++++++++++- crates/matrix-sdk/src/room/mod.rs | 114 ++++++++++++++++++ crates/matrix-sdk/src/room/request_to_join.rs | 17 +-- 3 files changed, 209 insertions(+), 15 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/room.rs b/bindings/matrix-sdk-ffi/src/room.rs index eeb6eb4af17..ccb2b5173d5 100644 --- a/bindings/matrix-sdk-ffi/src/room.rs +++ b/bindings/matrix-sdk-ffi/src/room.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, pin::pin, sync::Arc}; use anyhow::{Context, Result}; -use futures_util::StreamExt; +use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{ crypto::LocalTrust, event_cache::paginator::PaginatorError, @@ -882,6 +882,97 @@ impl Room { Ok(()) } + + /// Subscribes to requests to join this room, using a `listener` to be + /// notified of the changes. + /// + /// The current requests to join the room will be emitted immediately + /// when subscribing, along with a [`TaskHandle`] to cancel the + /// subscription. + pub async fn subscribe_to_requests_to_join( + self: Arc, + listener: Box, + ) -> Result, ClientError> { + let stream = self.inner.subscribe_to_requests_to_join().await?; + + let handle = Arc::new(TaskHandle::new(RUNTIME.spawn(async move { + pin_mut!(stream); + while let Some(requests) = stream.next().await { + listener.call(requests.into_iter().map(Into::into).collect()); + } + }))); + + Ok(handle) + } +} + +impl From for RequestToJoin { + fn from(request: matrix_sdk::room::request_to_join::RequestToJoinRoom) -> Self { + Self { + event_id: request.event_id.to_string(), + user_id: request.member_info.user_id.to_string(), + room_id: request.room_id().to_string(), + display_name: request.member_info.display_name.clone(), + avatar_url: request.member_info.avatar_url.as_ref().map(|url| url.to_string()), + reason: request.member_info.reason.clone(), + is_seen: request.is_seen, + actions: Arc::new(RequestToJoinActions { inner: request }), + } + } +} + +/// A listener for receiving new requests to a join a room. +#[matrix_sdk_ffi_macros::export(callback_interface)] +pub trait RequestsToJoinListener: Send + Sync { + fn call(&self, requests_to_join: Vec); +} + +/// An FFI representation of a request to join a room. +#[derive(Debug, Clone, uniffi::Record)] +pub struct RequestToJoin { + /// The event id of the event that contains the `knock` membership change. + pub event_id: String, + /// The user id of the user who's requesting to join the room. + pub user_id: String, + /// The room id of the room whose access was requested. + pub room_id: String, + /// The optional display name of the user who's requesting to join the room. + pub display_name: Option, + /// The optional avatar url of the user who's requesting to join the room. + pub avatar_url: Option, + /// An optional reason why the user wants join the room. + pub reason: Option, + /// Whether the request to join has been marked as `seen` so it can be + /// filtered by the client. + pub is_seen: bool, + /// A set of actions to perform for this request to join. + pub actions: Arc, +} + +/// A set of actions to perform for a request to join. +#[derive(Debug, Clone, uniffi::Object)] +pub struct RequestToJoinActions { + inner: matrix_sdk::room::request_to_join::RequestToJoinRoom, +} + +#[matrix_sdk_ffi_macros::export] +impl RequestToJoinActions { + /// Accepts the request to join by inviting the user to the room. + pub async fn accept(&self) -> Result<(), ClientError> { + self.inner.accept().await.map_err(Into::into) + } + + /// Declines the request to join by kicking the user from the room with an + /// optional reason. + pub async fn decline(&self, reason: Option) -> Result<(), ClientError> { + self.inner.decline(reason.as_deref()).await.map_err(Into::into) + } + + /// Declines the request to join by banning the user from the room with an + /// optional reason. + pub async fn decline_and_ban(&self, reason: Option) -> Result<(), ClientError> { + self.inner.decline_and_ban(reason.as_deref()).await.map_err(Into::into) + } } /// Generates a `matrix.to` permalink to the given room alias. diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 6477dd09571..6c5b75f5d43 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -22,6 +22,7 @@ use std::{ time::Duration, }; +use async_stream::stream; #[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))] use async_trait::async_trait; use eyeball::SharedObservable; @@ -80,6 +81,7 @@ use ruma::{ avatar::{self, RoomAvatarEventContent}, encryption::RoomEncryptionEventContent, history_visibility::HistoryVisibility, + member::{MembershipChange, SyncRoomMemberEvent}, message::{ AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent, FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent, @@ -119,6 +121,7 @@ use ruma::{ use serde::de::DeserializeOwned; use thiserror::Error; use tokio::sync::broadcast; +use tokio_stream::StreamExt; use tracing::{debug, info, instrument, warn}; use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent}; @@ -3205,6 +3208,117 @@ impl Room { ObservableLiveLocation::new(&self.client, self.room_id()) } + /// Helper to requests to join this `Room`. It returns both a list with the + /// initial items and any new request to join received. + pub async fn subscribe_to_requests_to_join( + &self, + ) -> Result>> { + let this = Arc::new(self.clone()); + + let requests_observable = + this.client.observe_room_events::(this.room_id()); + + let (current_seen_ids, mut seen_request_ids_stream) = + this.subscribe_to_seen_requests_to_join_ids().await?; + + let combined_stream = stream! { + // Emit current requests to join + match this.clone().get_current_requests_to_join(¤t_seen_ids).await { + Ok(initial_requests) => yield initial_requests, + Err(e) => warn!("Failed to get initial requests to join: {e:?}") + } + + let mut requests_stream = requests_observable.subscribe(); + + let mut new_event: Option = None; + let mut seen_ids = current_seen_ids.clone(); + let mut prev_seen_ids = current_seen_ids; + + loop { + // This is equivalent to a combine stream operation, triggering a new emission + // when any of the 2 sides changes + tokio::select! { + Some((next, _)) = requests_stream.next() => { new_event = Some(next); } + Some(next) = seen_request_ids_stream.next() => { seen_ids = next; } + else => break, + } + + let has_new_seen_ids = prev_seen_ids != seen_ids; + if has_new_seen_ids { + prev_seen_ids = seen_ids.clone(); + } + + if let Some(SyncStateEvent::Original(event)) = new_event.clone() { + // Reset the new event value so we can check this again in the next loop + new_event = None; + + // If we can calculate the membership change, try to emit only when needed + if event.prev_content().is_some() { + match event.membership_change() { + MembershipChange::Banned | + MembershipChange::Knocked | + MembershipChange::KnockAccepted | + MembershipChange::KnockDenied | + MembershipChange::KnockRetracted => { + match this.clone().get_current_requests_to_join(&seen_ids).await { + Ok(requests) => yield requests, + Err(e) => { + warn!("Failed to get updated requests to join on membership change: {e:?}") + } + } + } + _ => (), + } + } else { + // If we can't calculate the membership change, assume we need to + // emit updated values + match this.clone().get_current_requests_to_join(&seen_ids).await { + Ok(requests) => yield requests, + Err(e) => { + warn!("Failed to get updated requests to join on new member event: {e:?}") + } + } + } + } else if has_new_seen_ids { + // If seen requests have changed, we need to recalculate all the + // requests to join + match this.clone().get_current_requests_to_join(&seen_ids).await { + Ok(requests) => yield requests, + Err(e) => { + warn!("Failed to get updated requests to join on seen ids changed: {e:?}") + } + } + } + } + }; + + Ok(combined_stream) + } + + async fn get_current_requests_to_join( + self: Arc, + seen_request_ids: &HashSet, + ) -> Result> { + Ok(self + .members(RoomMemberships::KNOCK) + .await? + .into_iter() + .filter_map(|member| { + if let Some(event_id) = member.event().event_id() { + let event_id = event_id.to_owned(); + Some(RequestToJoinRoom::new( + self.clone(), + &event_id, + member.into(), + seen_request_ids.contains(&event_id), + )) + } else { + None + } + }) + .collect()) + } + /// Mark a list of requests to join the room as seen, given their state /// event ids. pub async fn mark_requests_to_join_as_seen(&self, event_ids: &[OwnedEventId]) -> Result<()> { diff --git a/crates/matrix-sdk/src/room/request_to_join.rs b/crates/matrix-sdk/src/room/request_to_join.rs index 5ce78367e4d..cad417f6a49 100644 --- a/crates/matrix-sdk/src/room/request_to_join.rs +++ b/crates/matrix-sdk/src/room/request_to_join.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use ruma::{ - events::room::member::OriginalSyncRoomMemberEvent, EventId, OwnedEventId, OwnedMxcUri, + EventId, OwnedEventId, OwnedMxcUri, OwnedUserId, RoomId, }; @@ -36,9 +36,9 @@ impl RequestToJoinRoom { /// Marks the request to join as 'seen' so the client can ignore it in the /// future. - pub async fn mark_as_seen(&mut self) -> Result { + pub async fn mark_as_seen(&mut self) -> Result<(), Error> { self.room.mark_requests_to_join_as_seen(&[self.event_id.to_owned()]).await?; - Ok(true) + Ok(()) } /// Accepts the request to join by inviting the user to the room. @@ -82,14 +82,3 @@ impl From for RequestToJoinMemberInfo { } } } - -impl From for RequestToJoinMemberInfo { - fn from(member: OriginalSyncRoomMemberEvent) -> Self { - Self { - user_id: member.state_key, - display_name: member.content.displayname, - avatar_url: member.content.avatar_url, - reason: member.content.reason, - } - } -}