diff --git a/changelog.d/18068.misc b/changelog.d/18068.misc new file mode 100644 index 00000000000..af6f78f5492 --- /dev/null +++ b/changelog.d/18068.misc @@ -0,0 +1 @@ +Add a column `participant` to `room_memberships` table. \ No newline at end of file diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index d8f6f8ebdc3..1144ee470de 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -128,6 +128,7 @@ "pushers": ["enabled"], "redactions": ["have_censored"], "remote_media_cache": ["authenticated"], + "room_memberships": ["participant"], "room_stats_state": ["is_federatable"], "rooms": ["is_public", "has_auth_chain_index"], "sliding_sync_joined_rooms": ["is_encrypted"], diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index df3010ecf68..0a1ecdf5616 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1440,6 +1440,9 @@ async def handle_new_client_event( ) return prev_event + if event.type == "m.room.message" or event.type == "m.room.encrypted": + await self.store.set_room_participation(event.room_id, event.user_id) + if event.internal_metadata.is_out_of_band_membership(): # the only sort of out-of-band-membership events we expect to see here are # invite rejections and rescinded knocks that we have generated ourselves. diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 50ed6a28bf0..42d9ad2d5db 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1606,6 +1606,35 @@ def _get_rooms_for_user_by_join_date_txn( from_ts, ) + async def set_room_participation(self, room_id: str, user_id: str) -> None: + """ + Record the provided user as participating in the given room + + Args: + room_id: ID of the room to set the participant in + user_id: the user ID of the user + """ + await self.db_pool.simple_update( + "room_memberships", + {"user_id": user_id, "room_id": room_id}, + {"participant": True}, + "update_room_participation", + ) + + async def get_room_participation(self, room_id: str, user_id: str) -> bool: + """ + Check whether a user is listed as a participant in a room + + Args: + room_id: ID of the room to check in + user_id: user ID of the user + """ + return await self.db_pool.simple_select_one_onecol( + "room_memberships", + {"user_id": user_id, "room_id": room_id}, + "participant", + ) + class RoomMemberBackgroundUpdateStore(SQLBaseStore): def __init__( @@ -1636,6 +1665,84 @@ def __init__( columns=["user_id", "room_id"], ) + self.db_pool.updates.register_background_update_handler( + "populate_participant_bg_update", self._populate_participant + ) + + async def _populate_participant(self, progress: JsonDict, batch_size: int) -> int: + """ + Background update to populate column `participant` on `room_memberships` table + one room at a time + """ + last_room_id = progress.get("last_room_id", "") + + def _get_current_room_txn( + txn: LoggingTransaction, last_room_id: str + ) -> Optional[str]: + sql = """ + SELECT room_id from room_memberships WHERE room_id > ? + ORDER BY room_id + LIMIT 1; + """ + txn.execute(sql, (last_room_id,)) + res = txn.fetchone() + if res: + room_id = res[0] + return room_id + else: + return None + + def _background_populate_participant_per_room_txn( + txn: LoggingTransaction, current_room_id: str + ) -> None: + sql = """ + SELECT DISTINCT c.state_key + FROM current_state_events AS c + INNER JOIN events AS e USING(room_id) + WHERE room_id = ? + AND c.membership = 'join' + AND e.type = 'm.room.message' + OR e.type = 'm.room.encrypted' + AND c.state_key = e.sender; + """ + + txn.execute(sql, (current_room_id,)) + res = txn.fetchall() + + if res: + participants = [user[0] for user in res] + for participant in participants: + self.db_pool.simple_update_txn( + txn, + table="room_memberships", + keyvalues={"user_id": participant, "room_id": current_room_id}, + updatevalues={"participant": True}, + ) + + current_room_id = await self.db_pool.runInteraction( + "_get_current_room_txn", _get_current_room_txn, last_room_id + ) + if not current_room_id: + await self.db_pool.updates._end_background_update( + "populate_participant_bg_update" + ) + return 1 + + await self.db_pool.runInteraction( + "_background_populate_participant_per_room_txn", + _background_populate_participant_per_room_txn, + current_room_id, + ) + + progress["last_room_id"] = current_room_id + await self.db_pool.runInteraction( + "populate_participant_bg_update", + self.db_pool.updates._background_update_progress_txn, + "populate_participant_bg_update", + progress, + ) + return 1 + async def _background_add_membership_profile( self, progress: JsonDict, batch_size: int ) -> int: diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 934e1cccedb..67a5c18e90c 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -19,7 +19,7 @@ # # -SCHEMA_VERSION = 88 # remember to update the list below when updating +SCHEMA_VERSION = 89 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -155,6 +155,9 @@ be posted in response to a resettable timeout or an on-demand action. - Add background update to fix data integrity issue in the `sliding_sync_membership_snapshots` -> `forgotten` column + +Changes in SCHEMA_VERSION = 89 + - Add a column `participant` to `room_memberships` table """ diff --git a/synapse/storage/schema/main/delta/89/01_add_column_participant_room_memberships_table.sql b/synapse/storage/schema/main/delta/89/01_add_column_participant_room_memberships_table.sql new file mode 100644 index 00000000000..3d8ee437137 --- /dev/null +++ b/synapse/storage/schema/main/delta/89/01_add_column_participant_room_memberships_table.sql @@ -0,0 +1,20 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +-- Add a column `participant` to `room_memberships` table to track whether a room member has sent +-- a `m.room.message` event into a room they are a member of +ALTER TABLE room_memberships ADD COLUMN participant BOOLEAN DEFAULT FALSE; + +-- Add a background update to populate `participant` column +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8901, 'populate_participant_bg_update', '{}'); \ No newline at end of file diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 4cf1a3dc519..ba0ff4d0451 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -4070,3 +4070,84 @@ def test_suspended_user_cannot_redact_messages_other_than_their_own(self) -> Non shorthand=False, ) self.assertEqual(channel.code, 200) + + +class RoomParticipantTestCase(unittest.HomeserverTestCase): + servlets = [ + login.register_servlets, + room.register_servlets, + profile.register_servlets, + admin.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.user1 = self.register_user("thomas", "hackme") + self.tok1 = self.login("thomas", "hackme") + + self.user2 = self.register_user("teresa", "hackme") + self.tok2 = self.login("teresa", "hackme") + + self.room1 = self.helper.create_room_as(room_creator=self.user1, tok=self.tok1) + self.room2 = self.helper.create_room_as( + self.user1, is_public=False, tok=self.tok1 + ) + self.store = hs.get_datastores().main + + def test_sending_message_records_participation(self) -> None: + """ + Test that sending an m.room.message event into a room causes the user to + be marked as a participant in that room + """ + self.helper.join(self.room1, self.user2, tok=self.tok2) + + # user has not sent any messages, so should not be a participant + participant = self.get_success( + self.store.get_room_participation(self.room1, self.user2) + ) + self.assertFalse(participant) + + # sending a message should now mark user as participant + self.helper.send_event( + self.room1, + "m.room.message", + content={ + "msgtype": "m.text", + "body": "I am engaging in this room", + }, + tok=self.tok2, + ) + participant = self.get_success( + self.store.get_room_participation(self.room1, self.user2) + ) + self.assertTrue(participant) + + def test_sending_encrypted_event_records_participation(self) -> None: + """ + Test that sending an m.room.encrypted event into a room causes the user to + be marked as a participant in that room + """ + self.helper.join(self.room1, self.user2, tok=self.tok2) + + # user has not sent any messages, so should not be a participant + participant = self.get_success( + self.store.get_room_participation(self.room1, self.user2) + ) + self.assertFalse(participant) + + # sending an encrypted event should now mark user as participant + self.helper.send_event( + self.room1, + "m.room.encrypted", + content={ + "algorithm": "m.megolm.v1.aes-sha2", + "ciphertext": "AwgAEnACgAkLmt6qF84IK++J7UDH2Za1YVchHyprqTqsg...", + "device_id": "RJYKSTBOIE", + "sender_key": "IlRMeOPX2e0MurIyfWEucYBRVOEEUMrOHqn/8mLqMjA", + "session_id": "X3lUlvLELLYxeTx4yOVu6UDpasGEVO0Jbu+QFnm0cKQ", + }, + tok=self.tok2, + ) + participant = self.get_success( + self.store.get_room_participation(self.room1, self.user2) + ) + self.assertTrue(participant)