From 81189a81a9f968cc3514052fbf87ef4cb62c07bf Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 15 Oct 2024 16:56:10 -0500 Subject: [PATCH] Add background update to fix existing databases that have been running with this flaw --- .../databases/main/events_bg_updates.py | 111 ++++++++++++++++++ synapse/storage/schema/__init__.py | 6 +- ..._membership_snapshots_forgotten_column.sql | 23 ++++ synapse/types/storage/__init__.py | 3 + tests/storage/test_sliding_sync_tables.py | 103 ++++++++++++++++ 5 files changed, 245 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/main/delta/89/01_fix_sliding_sync_membership_snapshots_forgotten_column.sql diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index a8723f94bc8..7643c46850c 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -304,6 +304,12 @@ def __init__( _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, self._sliding_sync_membership_snapshots_bg_update, ) + # Add a background update tofix data integrity issue in the + # `sliding_sync_membership_snapshots` -> `forgotten` column + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE, + self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update, + ) # We want this to run on the main database at startup before we start processing # events. @@ -2429,6 +2435,111 @@ def _fill_table_txn(txn: LoggingTransaction) -> None: return len(memberships_to_update_rows) + async def _sliding_sync_membership_snapshots_fix_forgotten_column_bg_update( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to update the `sliding_sync_membership_snapshots` -> + `forgotten` column to be in sync with the `room_memberships` table. + + For any room that someone has forgotten and subsequently re-joined or had any + new membership on, we need to go and update the column to match the + `room_memberships` table as it has fallen out of sync. + """ + last_event_stream_ordering = progress.get( + "last_event_stream_ordering", -(1 << 31) + ) + + # Any row in `sliding_sync_membership_snapshots` with `forgotten=1` we need to recheck + def _find_memberships_to_update_txn( + txn: LoggingTransaction, + ) -> List[Tuple[str, str, str, int]]: + txn.execute( + """ + SELECT + room_id, + user_id, + membership_event_id, + event_stream_ordering + FROM sliding_sync_membership_snapshots + WHERE event_stream_ordering > ? + AND forgotten = 1 + ORDER BY event_stream_ordering ASC + LIMIT ? + """, + (last_event_stream_ordering, batch_size), + ) + + memberships_to_update_rows = cast( + List[Tuple[str, str, str, int]], + txn.fetchall(), + ) + + return memberships_to_update_rows + + memberships_to_update_rows = await self.db_pool.runInteraction( + "_sliding_sync_membership_snapshots_fix_forgotten_column_bg_update._find_memberships_to_update_txn", + _find_memberships_to_update_txn, + ) + + if not memberships_to_update_rows: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE + ) + return 0 + + forgotten_update_query_args: List[Tuple[str, str, str, str]] = [] + for ( + room_id, + user_id, + membership_event_id, + _event_stream_ordering, + ) in memberships_to_update_rows: + forgotten_update_query_args.append( + ( + membership_event_id, + room_id, + user_id, + membership_event_id, + ) + ) + + def _fill_table_txn(txn: LoggingTransaction) -> None: + # Handle updating the `sliding_sync_membership_snapshots` table + # + # We need to find the `forgotten` value during the transaction because + # we can't risk inserting stale data. + txn.execute_batch( + """ + UPDATE sliding_sync_membership_snapshots + SET + forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?) + WHERE room_id = ? and user_id = ? AND membership_event_id = ? + """, + forgotten_update_query_args, + ) + + await self.db_pool.runInteraction( + "sliding_sync_membership_snapshots_fix_forgotten_column_bg_update", + _fill_table_txn, + ) + + # Update the progress + ( + _room_id, + _user_id, + _membership_event_id, + event_stream_ordering, + ) = memberships_to_update_rows[-1] + await self.db_pool.updates._background_update_progress( + _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE, + { + "last_event_stream_ordering": event_stream_ordering, + }, + ) + + return len(memberships_to_update_rows) + def _resolve_stale_data_in_sliding_sync_tables( txn: LoggingTransaction, diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index f171f4568a0..c9da4a00e0e 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -19,7 +19,7 @@ # # -SCHEMA_VERSION = 88 # remember to update the list below when updating +SCHEMA_VERSION = 89 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -153,6 +153,10 @@ Changes in SCHEMA_VERSION = 88 - MSC4140: Add `delayed_events` table that keeps track of events that are to be posted in response to a resettable timeout or an on-demand action. + +Changes in SCHEMA_VERSION = 89 + - Add background update to fix data integrity issue in the + `sliding_sync_membership_snapshots` -> `forgotten` column """ diff --git a/synapse/storage/schema/main/delta/89/01_fix_sliding_sync_membership_snapshots_forgotten_column.sql b/synapse/storage/schema/main/delta/89/01_fix_sliding_sync_membership_snapshots_forgotten_column.sql new file mode 100644 index 00000000000..9e2b616be51 --- /dev/null +++ b/synapse/storage/schema/main/delta/89/01_fix_sliding_sync_membership_snapshots_forgotten_column.sql @@ -0,0 +1,23 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + + + +-- Add a background update to update the `sliding_sync_membership_snapshots` -> +-- `forgotten` column to be in sync with the `room_memberships` table. +-- +-- For any room that someone has forgotten and subsequently re-joined or had any new +-- membership on, we need to go and update the column to match the `room_memberships` +-- table as it has fallen out of sync. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8901, 'sliding_sync_membership_snapshots_fix_forgotten_column_bg_update', '{}'); diff --git a/synapse/types/storage/__init__.py b/synapse/types/storage/__init__.py index fae5449bcc3..b5fa20a41a5 100644 --- a/synapse/types/storage/__init__.py +++ b/synapse/types/storage/__init__.py @@ -45,3 +45,6 @@ class _BackgroundUpdates: SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = ( "sliding_sync_membership_snapshots_bg_update" ) + SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE = ( + "sliding_sync_membership_snapshots_fix_forgotten_column_bg_update" + ) diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index 35917505a42..53212f7c452 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -5014,3 +5014,106 @@ def test_membership_snapshots_background_update_catch_up_no_membership( }, exact=True, ) + + +class SlidingSyncMembershipSnapshotsTableFixForgottenColumnBackgroundUpdatesTestCase( + SlidingSyncTablesTestCaseBase +): + """ + Test the background updates that fixes `sliding_sync_membership_snapshots` -> + `forgotten` column. + """ + + def test_membership_snapshots_fix_forgotten_column_background_update(self) -> None: + """ + Test that the background update, updates the `sliding_sync_membership_snapshots` + -> `forgotten` column to be in sync with the `room_memberships` table. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # User1 joins the room + self.helper.join(room_id, user1_id, tok=user1_tok) + + # Leave and forget the room + self.helper.leave(room_id, user1_id, tok=user1_tok) + # User1 forgets the room + channel = self.make_request( + "POST", + f"/_matrix/client/r0/rooms/{room_id}/forget", + content={}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + # Re-join the room + self.helper.join(room_id, user1_id, tok=user1_tok) + + # Reset `sliding_sync_membership_snapshots` table as if the `forgotten` column + # got out of sync from the `room_memberships` table from the previous flawed + # code. + self.get_success( + self.store.db_pool.simple_update_one( + table="sliding_sync_membership_snapshots", + keyvalues={"room_id": room_id, "user_id": user1_id}, + updatevalues={"forgotten": 1}, + desc="sliding_sync_membership_snapshots.test_membership_snapshots_fix_forgotten_column_background_update", + ) + ) + + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE, + "progress_json": "{}", + }, + ) + ) + self.store.db_pool.updates._all_done = False + self.wait_for_background_updates() + + # Make sure the table is populated + + sliding_sync_membership_snapshots_results = ( + self._get_sliding_sync_membership_snapshots() + ) + self.assertIncludes( + set(sliding_sync_membership_snapshots_results.keys()), + { + (room_id, user1_id), + (room_id, user2_id), + }, + exact=True, + ) + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id) + ) + # Holds the info according to the current state when the user joined. + # + # We only care about checking on user1 as that's what we reset and expect to be + # correct now + self.assertEqual( + sliding_sync_membership_snapshots_results.get((room_id, user1_id)), + _SlidingSyncMembershipSnapshotResult( + room_id=room_id, + user_id=user1_id, + sender=user1_id, + membership_event_id=state_map[(EventTypes.Member, user1_id)].event_id, + membership=Membership.JOIN, + event_stream_ordering=state_map[ + (EventTypes.Member, user1_id) + ].internal_metadata.stream_ordering, + has_known_state=True, + room_type=None, + room_name=None, + is_encrypted=False, + tombstone_successor_room_id=None, + # We should see the room as no longer forgotten + forgotten=False, + ), + )