Skip to content

Commit

Permalink
feat(room): allow subscribing to requests to join a room
Browse files Browse the repository at this point in the history
  • Loading branch information
jmartinesp committed Dec 10, 2024
1 parent ffe2d57 commit 8f0beb9
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 15 deletions.
93 changes: 92 additions & 1 deletion bindings/matrix-sdk-ffi/src/room.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashMap, pin::pin, sync::Arc};

use anyhow::{Context, Result};
use futures_util::StreamExt;
use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{
crypto::LocalTrust,
event_cache::paginator::PaginatorError,
Expand Down Expand Up @@ -882,6 +882,97 @@ impl Room {

Ok(())
}

/// Subscribes to requests to join this room, using a `listener` to be
/// notified of the changes.
///
/// The current requests to join the room will be emitted immediately
/// when subscribing, along with a [`TaskHandle`] to cancel the
/// subscription.
pub async fn subscribe_to_requests_to_join(
self: Arc<Self>,
listener: Box<dyn RequestsToJoinListener>,
) -> Result<Arc<TaskHandle>, ClientError> {
let stream = self.inner.subscribe_to_requests_to_join().await?;

let handle = Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
pin_mut!(stream);
while let Some(requests) = stream.next().await {
listener.call(requests.into_iter().map(Into::into).collect());
}
})));

Ok(handle)
}
}

impl From<matrix_sdk::room::request_to_join::RequestToJoinRoom> for RequestToJoin {
fn from(request: matrix_sdk::room::request_to_join::RequestToJoinRoom) -> Self {
Self {
event_id: request.event_id.to_string(),
user_id: request.member_info.user_id.to_string(),
room_id: request.room_id().to_string(),
display_name: request.member_info.display_name.clone(),
avatar_url: request.member_info.avatar_url.as_ref().map(|url| url.to_string()),
reason: request.member_info.reason.clone(),
is_seen: request.is_seen,
actions: Arc::new(RequestToJoinActions { inner: request }),
}
}
}

/// A listener for receiving new requests to a join a room.
#[matrix_sdk_ffi_macros::export(callback_interface)]
pub trait RequestsToJoinListener: Send + Sync {
fn call(&self, requests_to_join: Vec<RequestToJoin>);
}

/// An FFI representation of a request to join a room.
#[derive(Debug, Clone, uniffi::Record)]
pub struct RequestToJoin {
/// The event id of the event that contains the `knock` membership change.
pub event_id: String,
/// The user id of the user who's requesting to join the room.
pub user_id: String,
/// The room id of the room whose access was requested.
pub room_id: String,
/// The optional display name of the user who's requesting to join the room.
pub display_name: Option<String>,
/// The optional avatar url of the user who's requesting to join the room.
pub avatar_url: Option<String>,
/// An optional reason why the user wants join the room.
pub reason: Option<String>,
/// Whether the request to join has been marked as `seen` so it can be
/// filtered by the client.
pub is_seen: bool,
/// A set of actions to perform for this request to join.
pub actions: Arc<RequestToJoinActions>,
}

/// A set of actions to perform for a request to join.
#[derive(Debug, Clone, uniffi::Object)]
pub struct RequestToJoinActions {
inner: matrix_sdk::room::request_to_join::RequestToJoinRoom,
}

#[matrix_sdk_ffi_macros::export]
impl RequestToJoinActions {
/// Accepts the request to join by inviting the user to the room.
pub async fn accept(&self) -> Result<(), ClientError> {
self.inner.accept().await.map_err(Into::into)
}

/// Declines the request to join by kicking the user from the room with an
/// optional reason.
pub async fn decline(&self, reason: Option<String>) -> Result<(), ClientError> {
self.inner.decline(reason.as_deref()).await.map_err(Into::into)
}

/// Declines the request to join by banning the user from the room with an
/// optional reason.
pub async fn decline_and_ban(&self, reason: Option<String>) -> Result<(), ClientError> {
self.inner.decline_and_ban(reason.as_deref()).await.map_err(Into::into)
}
}

/// Generates a `matrix.to` permalink to the given room alias.
Expand Down
114 changes: 114 additions & 0 deletions crates/matrix-sdk/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
time::Duration,
};

use async_stream::stream;
#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
use async_trait::async_trait;
use eyeball::SharedObservable;
Expand Down Expand Up @@ -80,6 +81,7 @@ use ruma::{
avatar::{self, RoomAvatarEventContent},
encryption::RoomEncryptionEventContent,
history_visibility::HistoryVisibility,
member::{MembershipChange, SyncRoomMemberEvent},
message::{
AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
Expand Down Expand Up @@ -119,6 +121,7 @@ use ruma::{
use serde::de::DeserializeOwned;
use thiserror::Error;
use tokio::sync::broadcast;
use tokio_stream::StreamExt;
use tracing::{debug, info, instrument, warn};

use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
Expand Down Expand Up @@ -3205,6 +3208,117 @@ impl Room {
ObservableLiveLocation::new(&self.client, self.room_id())
}

/// Helper to requests to join this `Room`. It returns both a list with the
/// initial items and any new request to join received.
pub async fn subscribe_to_requests_to_join(
&self,
) -> Result<impl Stream<Item = Vec<RequestToJoinRoom>>> {
let this = Arc::new(self.clone());

let requests_observable =
this.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());

let (current_seen_ids, mut seen_request_ids_stream) =
this.subscribe_to_seen_requests_to_join_ids().await?;

let combined_stream = stream! {
// Emit current requests to join
match this.clone().get_current_requests_to_join(&current_seen_ids).await {
Ok(initial_requests) => yield initial_requests,
Err(e) => warn!("Failed to get initial requests to join: {e:?}")
}

let mut requests_stream = requests_observable.subscribe();

let mut new_event: Option<SyncRoomMemberEvent> = None;
let mut seen_ids = current_seen_ids.clone();
let mut prev_seen_ids = current_seen_ids;

loop {
// This is equivalent to a combine stream operation, triggering a new emission
// when any of the 2 sides changes
tokio::select! {
Some((next, _)) = requests_stream.next() => { new_event = Some(next); }
Some(next) = seen_request_ids_stream.next() => { seen_ids = next; }
else => break,
}

let has_new_seen_ids = prev_seen_ids != seen_ids;
if has_new_seen_ids {
prev_seen_ids = seen_ids.clone();
}

if let Some(SyncStateEvent::Original(event)) = new_event.clone() {
// Reset the new event value so we can check this again in the next loop
new_event = None;

// If we can calculate the membership change, try to emit only when needed
if event.prev_content().is_some() {
match event.membership_change() {
MembershipChange::Banned |
MembershipChange::Knocked |
MembershipChange::KnockAccepted |
MembershipChange::KnockDenied |
MembershipChange::KnockRetracted => {
match this.clone().get_current_requests_to_join(&seen_ids).await {
Ok(requests) => yield requests,
Err(e) => {
warn!("Failed to get updated requests to join on membership change: {e:?}")
}
}
}
_ => (),
}
} else {
// If we can't calculate the membership change, assume we need to
// emit updated values
match this.clone().get_current_requests_to_join(&seen_ids).await {
Ok(requests) => yield requests,
Err(e) => {
warn!("Failed to get updated requests to join on new member event: {e:?}")
}
}
}
} else if has_new_seen_ids {
// If seen requests have changed, we need to recalculate all the
// requests to join
match this.clone().get_current_requests_to_join(&seen_ids).await {
Ok(requests) => yield requests,
Err(e) => {
warn!("Failed to get updated requests to join on seen ids changed: {e:?}")
}
}
}
}
};

Ok(combined_stream)
}

async fn get_current_requests_to_join(
self: Arc<Self>,
seen_request_ids: &HashSet<OwnedEventId>,
) -> Result<Vec<RequestToJoinRoom>> {
Ok(self
.members(RoomMemberships::KNOCK)
.await?
.into_iter()
.filter_map(|member| {
if let Some(event_id) = member.event().event_id() {
let event_id = event_id.to_owned();
Some(RequestToJoinRoom::new(
self.clone(),
&event_id,
member.into(),
seen_request_ids.contains(&event_id),
))
} else {
None
}
})
.collect())
}

/// Mark a list of requests to join the room as seen, given their state
/// event ids.
pub async fn mark_requests_to_join_as_seen(&self, event_ids: &[OwnedEventId]) -> Result<()> {
Expand Down
17 changes: 3 additions & 14 deletions crates/matrix-sdk/src/room/request_to_join.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use ruma::{
events::room::member::OriginalSyncRoomMemberEvent, EventId, OwnedEventId, OwnedMxcUri,
EventId, OwnedEventId, OwnedMxcUri,
OwnedUserId, RoomId,
};

Expand Down Expand Up @@ -36,9 +36,9 @@ impl RequestToJoinRoom {

/// Marks the request to join as 'seen' so the client can ignore it in the
/// future.
pub async fn mark_as_seen(&mut self) -> Result<bool, Error> {
pub async fn mark_as_seen(&mut self) -> Result<(), Error> {
self.room.mark_requests_to_join_as_seen(&[self.event_id.to_owned()]).await?;
Ok(true)
Ok(())
}

/// Accepts the request to join by inviting the user to the room.
Expand Down Expand Up @@ -82,14 +82,3 @@ impl From<RoomMember> for RequestToJoinMemberInfo {
}
}
}

impl From<OriginalSyncRoomMemberEvent> for RequestToJoinMemberInfo {
fn from(member: OriginalSyncRoomMemberEvent) -> Self {
Self {
user_id: member.state_key,
display_name: member.content.displayname,
avatar_url: member.content.avatar_url,
reason: member.content.reason,
}
}
}

0 comments on commit 8f0beb9

Please sign in to comment.