Skip to content

Commit

Permalink
Add background update to fix existing databases that have been runnin…
Browse files Browse the repository at this point in the history
…g with this flaw
  • Loading branch information
MadLittleMods committed Oct 15, 2024
1 parent 9fda8d0 commit 81189a8
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 1 deletion.
111 changes: 111 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ def __init__(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
self._sliding_sync_membership_snapshots_bg_update,
)
# Add a background update tofix data integrity issue in the
# `sliding_sync_membership_snapshots` -> `forgotten` column
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE,
self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update,
)

# We want this to run on the main database at startup before we start processing
# events.
Expand Down Expand Up @@ -2429,6 +2435,111 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:

return len(memberships_to_update_rows)

async def _sliding_sync_membership_snapshots_fix_forgotten_column_bg_update(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Background update to update the `sliding_sync_membership_snapshots` ->
`forgotten` column to be in sync with the `room_memberships` table.
For any room that someone has forgotten and subsequently re-joined or had any
new membership on, we need to go and update the column to match the
`room_memberships` table as it has fallen out of sync.
"""
last_event_stream_ordering = progress.get(
"last_event_stream_ordering", -(1 << 31)
)

# Any row in `sliding_sync_membership_snapshots` with `forgotten=1` we need to recheck
def _find_memberships_to_update_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, str, int]]:
txn.execute(
"""
SELECT
room_id,
user_id,
membership_event_id,
event_stream_ordering
FROM sliding_sync_membership_snapshots
WHERE event_stream_ordering > ?
AND forgotten = 1
ORDER BY event_stream_ordering ASC
LIMIT ?
""",
(last_event_stream_ordering, batch_size),
)

memberships_to_update_rows = cast(
List[Tuple[str, str, str, int]],
txn.fetchall(),
)

return memberships_to_update_rows

memberships_to_update_rows = await self.db_pool.runInteraction(
"_sliding_sync_membership_snapshots_fix_forgotten_column_bg_update._find_memberships_to_update_txn",
_find_memberships_to_update_txn,
)

if not memberships_to_update_rows:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE
)
return 0

forgotten_update_query_args: List[Tuple[str, str, str, str]] = []
for (
room_id,
user_id,
membership_event_id,
_event_stream_ordering,
) in memberships_to_update_rows:
forgotten_update_query_args.append(
(
membership_event_id,
room_id,
user_id,
membership_event_id,
)
)

def _fill_table_txn(txn: LoggingTransaction) -> None:
# Handle updating the `sliding_sync_membership_snapshots` table
#
# We need to find the `forgotten` value during the transaction because
# we can't risk inserting stale data.
txn.execute_batch(
"""
UPDATE sliding_sync_membership_snapshots
SET
forgotten = (SELECT forgotten FROM room_memberships WHERE event_id = ?)
WHERE room_id = ? and user_id = ? AND membership_event_id = ?
""",
forgotten_update_query_args,
)

await self.db_pool.runInteraction(
"sliding_sync_membership_snapshots_fix_forgotten_column_bg_update",
_fill_table_txn,
)

# Update the progress
(
_room_id,
_user_id,
_membership_event_id,
event_stream_ordering,
) = memberships_to_update_rows[-1]
await self.db_pool.updates._background_update_progress(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE,
{
"last_event_stream_ordering": event_stream_ordering,
},
)

return len(memberships_to_update_rows)


def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,
Expand Down
6 changes: 5 additions & 1 deletion synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#
#

SCHEMA_VERSION = 88 # remember to update the list below when updating
SCHEMA_VERSION = 89 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
Expand Down Expand Up @@ -153,6 +153,10 @@
Changes in SCHEMA_VERSION = 88
- MSC4140: Add `delayed_events` table that keeps track of events that are to
be posted in response to a resettable timeout or an on-demand action.
Changes in SCHEMA_VERSION = 89
- Add background update to fix data integrity issue in the
`sliding_sync_membership_snapshots` -> `forgotten` column
"""


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.



-- Add a background update to update the `sliding_sync_membership_snapshots` ->
-- `forgotten` column to be in sync with the `room_memberships` table.
--
-- For any room that someone has forgotten and subsequently re-joined or had any new
-- membership on, we need to go and update the column to match the `room_memberships`
-- table as it has fallen out of sync.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8901, 'sliding_sync_membership_snapshots_fix_forgotten_column_bg_update', '{}');
3 changes: 3 additions & 0 deletions synapse/types/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ class _BackgroundUpdates:
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
"sliding_sync_membership_snapshots_bg_update"
)
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE = (
"sliding_sync_membership_snapshots_fix_forgotten_column_bg_update"
)
103 changes: 103 additions & 0 deletions tests/storage/test_sliding_sync_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -5014,3 +5014,106 @@ def test_membership_snapshots_background_update_catch_up_no_membership(
},
exact=True,
)


class SlidingSyncMembershipSnapshotsTableFixForgottenColumnBackgroundUpdatesTestCase(
SlidingSyncTablesTestCaseBase
):
"""
Test the background updates that fixes `sliding_sync_membership_snapshots` ->
`forgotten` column.
"""

def test_membership_snapshots_fix_forgotten_column_background_update(self) -> None:
"""
Test that the background update, updates the `sliding_sync_membership_snapshots`
-> `forgotten` column to be in sync with the `room_memberships` table.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")

room_id = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# User1 joins the room
self.helper.join(room_id, user1_id, tok=user1_tok)

# Leave and forget the room
self.helper.leave(room_id, user1_id, tok=user1_tok)
# User1 forgets the room
channel = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{room_id}/forget",
content={},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.result)

# Re-join the room
self.helper.join(room_id, user1_id, tok=user1_tok)

# Reset `sliding_sync_membership_snapshots` table as if the `forgotten` column
# got out of sync from the `room_memberships` table from the previous flawed
# code.
self.get_success(
self.store.db_pool.simple_update_one(
table="sliding_sync_membership_snapshots",
keyvalues={"room_id": room_id, "user_id": user1_id},
updatevalues={"forgotten": 1},
desc="sliding_sync_membership_snapshots.test_membership_snapshots_fix_forgotten_column_background_update",
)
)

# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{
"update_name": _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE,
"progress_json": "{}",
},
)
)
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()

# Make sure the table is populated

sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(room_id, user1_id),
(room_id, user2_id),
},
exact=True,
)
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)
# Holds the info according to the current state when the user joined.
#
# We only care about checking on user1 as that's what we reset and expect to be
# correct now
self.assertEqual(
sliding_sync_membership_snapshots_results.get((room_id, user1_id)),
_SlidingSyncMembershipSnapshotResult(
room_id=room_id,
user_id=user1_id,
sender=user1_id,
membership_event_id=state_map[(EventTypes.Member, user1_id)].event_id,
membership=Membership.JOIN,
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
tombstone_successor_room_id=None,
# We should see the room as no longer forgotten
forgotten=False,
),
)

0 comments on commit 81189a8

Please sign in to comment.