Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a column participant to room_memberships table #18068

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18068.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a column `participant` to `room_memberships` table.
1 change: 1 addition & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
"pushers": ["enabled"],
"redactions": ["have_censored"],
"remote_media_cache": ["authenticated"],
"room_memberships": ["participant"],
"room_stats_state": ["is_federatable"],
"rooms": ["is_public", "has_auth_chain_index"],
"sliding_sync_joined_rooms": ["is_encrypted"],
Expand Down
3 changes: 3 additions & 0 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,9 @@ async def handle_new_client_event(
)
return prev_event

if event.type == "m.room.message" or event.type == "m.room.encrypted":
await self.store.set_room_participation(event.room_id, event.user_id)

if event.internal_metadata.is_out_of_band_membership():
# the only sort of out-of-band-membership events we expect to see here are
# invite rejections and rescinded knocks that we have generated ourselves.
Expand Down
107 changes: 107 additions & 0 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,35 @@ def _get_rooms_for_user_by_join_date_txn(
from_ts,
)

async def set_room_participation(self, room_id: str, user_id: str) -> None:
"""
Record the provided user as participating in the given room

Args:
room_id: ID of the room to set the participant in
user_id: the user ID of the user
"""
await self.db_pool.simple_update(
"room_memberships",
{"user_id": user_id, "room_id": room_id},
{"participant": True},
"update_room_participation",
)
Comment on lines +1617 to +1622
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that room_memberships is an append-only list of a user's joins/leaves to a room. When setting participant to true, we only need to do this for the most recent row for that user in room_memberships. We can do this by setting a limit of 1 here, and ordering by event_stream_ordering DESC. This should also cut down the query time.

An additional advantage from doing so is that for those that have joined and left a room repeatedly, we'll be able to look back in old rows of room_memberships and answer the question: for the periods that this user was in the room, which of those times did they participate? (Is that a useful signal for T&S purposes?)


async def get_room_participation(self, room_id: str, user_id: str) -> bool:
"""
Check whether a user is listed as a participant in a room

Args:
room_id: ID of the room to check in
user_id: user ID of the user
"""
return await self.db_pool.simple_select_one_onecol(
"room_memberships",
{"user_id": user_id, "room_id": room_id},
"participant",
)
Comment on lines +1632 to +1636
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should order this by event_stream_ordering DESC, otherwise we'll pick a random session for this user in the room.



class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(
Expand Down Expand Up @@ -1636,6 +1665,84 @@ def __init__(
columns=["user_id", "room_id"],
)

self.db_pool.updates.register_background_update_handler(
"populate_participant_bg_update", self._populate_participant
)

async def _populate_participant(self, progress: JsonDict, batch_size: int) -> int:
"""
Background update to populate column `participant` on `room_memberships` table
one room at a time
"""
last_room_id = progress.get("last_room_id", "")
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

def _get_current_room_txn(
txn: LoggingTransaction, last_room_id: str
) -> Optional[str]:
sql = """
SELECT room_id from room_memberships WHERE room_id > ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that room_memberships is a table that is always be appended to, and thus always changing under you. It is ordered by its event_stream_ordering column. So, the only way to traverse it while the system is running, without leaving gaps, is to iterate using the event_stream_ordering column.

Note: room_memberships only has rows deleted from it when a room is purged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we have:

  • a query to pull out a room ID
  • a query that pulls out all users that have ever been joined to that room
  • a query per-user per-room that updates all previous entries that match that user/room combination

I think we can instead do this in one query that processes a batch of room_membership rows all at once. Instead of saving the current room_id for the batch job, start with the currently max event_stream_ordering row and work backwards in batches of say 1000.

Constrain your query to the current event_stream_ordering - BATCH_SIZE. Then within that, UPDATE all rows based on data in the events table. Then save the new event_stream_ordering - BATCH_SIZE to your background job.

Now the table can continue to grow without things changing from underneath you, as historical data is only (rarely) deleted.

ORDER BY room_id
LIMIT 1;
"""
txn.execute(sql, (last_room_id,))
res = txn.fetchone()
if res:
room_id = res[0]
return room_id
else:
return None

def _background_populate_participant_per_room_txn(
txn: LoggingTransaction, current_room_id: str
) -> None:
sql = """
SELECT DISTINCT c.state_key
FROM current_state_events AS c
INNER JOIN events AS e USING(room_id)
WHERE room_id = ?
AND c.membership = 'join'
AND e.type = 'm.room.message'
OR e.type = 'm.room.encrypted'
AND c.state_key = e.sender;
"""
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

txn.execute(sql, (current_room_id,))
res = txn.fetchall()

if res:
participants = [user[0] for user in res]
for participant in participants:
self.db_pool.simple_update_txn(
txn,
table="room_memberships",
keyvalues={"user_id": participant, "room_id": current_room_id},
updatevalues={"participant": True},
)
Comment on lines +1715 to +1720
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: we could batch this query by using simple_update_many. But my other comment may cause this query to be replaced.


current_room_id = await self.db_pool.runInteraction(
"_get_current_room_txn", _get_current_room_txn, last_room_id
)
if not current_room_id:
await self.db_pool.updates._end_background_update(
"populate_participant_bg_update"
)
return 1

await self.db_pool.runInteraction(
"_background_populate_participant_per_room_txn",
_background_populate_participant_per_room_txn,
current_room_id,
)

progress["last_room_id"] = current_room_id
await self.db_pool.runInteraction(
"populate_participant_bg_update",
self.db_pool.updates._background_update_progress_txn,
"populate_participant_bg_update",
progress,
)
return 1

async def _background_add_membership_profile(
self, progress: JsonDict, batch_size: int
) -> int:
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#
#

SCHEMA_VERSION = 88 # remember to update the list below when updating
SCHEMA_VERSION = 89 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema

This should be incremented whenever the codebase changes its requirements on the
Expand Down Expand Up @@ -155,6 +155,9 @@
be posted in response to a resettable timeout or an on-demand action.
- Add background update to fix data integrity issue in the
`sliding_sync_membership_snapshots` -> `forgotten` column

Changes in SCHEMA_VERSION = 89
- Add a column `participant` to `room_memberships` table
"""


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

-- Add a column `participant` to `room_memberships` table to track whether a room member has sent
-- a `m.room.message` event into a room they are a member of
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
ALTER TABLE room_memberships ADD COLUMN participant BOOLEAN DEFAULT FALSE;

-- Add a background update to populate `participant` column
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8901, 'populate_participant_bg_update', '{}');
81 changes: 81 additions & 0 deletions tests/rest/client/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4070,3 +4070,84 @@ def test_suspended_user_cannot_redact_messages_other_than_their_own(self) -> Non
shorthand=False,
)
self.assertEqual(channel.code, 200)


class RoomParticipantTestCase(unittest.HomeserverTestCase):
servlets = [
login.register_servlets,
room.register_servlets,
profile.register_servlets,
admin.register_servlets,
]

def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.user1 = self.register_user("thomas", "hackme")
self.tok1 = self.login("thomas", "hackme")

self.user2 = self.register_user("teresa", "hackme")
self.tok2 = self.login("teresa", "hackme")

self.room1 = self.helper.create_room_as(room_creator=self.user1, tok=self.tok1)
self.room2 = self.helper.create_room_as(
self.user1, is_public=False, tok=self.tok1
)
self.store = hs.get_datastores().main

def test_sending_message_records_participation(self) -> None:
"""
Test that sending an m.room.message event into a room causes the user to
be marked as a participant in that room
"""
self.helper.join(self.room1, self.user2, tok=self.tok2)

# user has not sent any messages, so should not be a participant
participant = self.get_success(
self.store.get_room_participation(self.room1, self.user2)
)
self.assertFalse(participant)

# sending a message should now mark user as participant
self.helper.send_event(
self.room1,
"m.room.message",
content={
"msgtype": "m.text",
"body": "I am engaging in this room",
},
tok=self.tok2,
)
participant = self.get_success(
self.store.get_room_participation(self.room1, self.user2)
)
self.assertTrue(participant)

def test_sending_encrypted_event_records_participation(self) -> None:
"""
Test that sending an m.room.encrypted event into a room causes the user to
be marked as a participant in that room
"""
self.helper.join(self.room1, self.user2, tok=self.tok2)

# user has not sent any messages, so should not be a participant
participant = self.get_success(
self.store.get_room_participation(self.room1, self.user2)
)
self.assertFalse(participant)

# sending an encrypted event should now mark user as participant
self.helper.send_event(
self.room1,
"m.room.encrypted",
content={
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "AwgAEnACgAkLmt6qF84IK++J7UDH2Za1YVchHyprqTqsg...",
"device_id": "RJYKSTBOIE",
"sender_key": "IlRMeOPX2e0MurIyfWEucYBRVOEEUMrOHqn/8mLqMjA",
"session_id": "X3lUlvLELLYxeTx4yOVu6UDpasGEVO0Jbu+QFnm0cKQ",
},
tok=self.tok2,
)
participant = self.get_success(
self.store.get_room_participation(self.room1, self.user2)
)
self.assertTrue(participant)
Loading