diff --git a/changelog.d/17658.misc b/changelog.d/17658.misc new file mode 100644 index 00000000000..0bdbc1140db --- /dev/null +++ b/changelog.d/17658.misc @@ -0,0 +1 @@ +Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster. diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 444cc32f368..7340c6ec053 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -1040,29 +1040,67 @@ async def get_room_sync_data( ) ) - # By default, just choose the membership event position + # Figure out the last bump event in the room + # + # By default, just choose the membership event position for any non-join membership bump_stamp = room_membership_for_user_at_to_token.event_pos.stream - - # Figure out the last bump event in the room if we're in the room. + # If we're joined to the room, we need to find the last bump event before the + # `to_token` if room_membership_for_user_at_to_token.membership == Membership.JOIN: - last_bump_event_result = ( - await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, - to_token.room_key, - event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, - ) + # We can quickly query for the latest bump event in the room using the + # sliding sync tables. + latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room( + room_id ) - # But if we found a bump event, use that instead - if last_bump_event_result is not None: - _, new_bump_event_pos = last_bump_event_result + min_to_token_position = to_token.room_key.stream - # If we've just joined a remote room, then the last bump event may - # have been backfilled (and so have a negative stream ordering). - # These negative stream orderings can't sensibly be compared, so - # instead we use the membership event position. - if new_bump_event_pos.stream > 0: - bump_stamp = new_bump_event_pos.stream + # If we can rely on the new sliding sync tables and the `bump_stamp` is + # `None`, just fallback to the membership event position. This can happen + # when we've just joined a remote room and all the events are backfilled. + if ( + # FIXME: The background job check can be removed once we bump + # `SCHEMA_COMPAT_VERSION` and run the foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` + # (tracked by https://github.com/element-hq/synapse/issues/17623) + await self.store.have_finished_sliding_sync_background_jobs() + and latest_room_bump_stamp is None + ): + pass + + # The `bump_stamp` stored in the database might be ahead of our token. Since + # `bump_stamp` is only a `stream_ordering` position, we can't be 100% sure + # that's before the `to_token` in all scenarios. The only scenario we can be + # sure of is if the `bump_stamp` is totally before the minimum position from + # the token. + # + # We don't need to check if the background update has finished, as if the + # returned bump stamp is not None then it must be up to date. + elif ( + latest_room_bump_stamp is not None + and latest_room_bump_stamp < min_to_token_position + ): + bump_stamp = latest_room_bump_stamp + + # Otherwise, if it's within or after the `to_token`, we need to find the + # last bump event before the `to_token`. + else: + last_bump_event_result = ( + await self.store.get_last_event_pos_in_room_before_stream_ordering( + room_id, + to_token.room_key, + event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, + ) + ) + if last_bump_event_result is not None: + _, new_bump_event_pos = last_bump_event_result + + # If we've just joined a remote room, then the last bump event may + # have been backfilled (and so have a negative stream ordering). + # These negative stream orderings can't sensibly be compared, so + # instead we use the membership event position. + if new_bump_event_pos.stream > 0: + bump_stamp = new_bump_event_pos.stream unstable_expanded_timeline = False prev_room_sync_config = previous_connection_state.room_configs.get(room_id) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index d423d80efa7..e5f63019fda 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -327,6 +327,13 @@ async def _persist_events_and_state_updates( async with stream_ordering_manager as stream_orderings: for (event, _), stream in zip(events_and_contexts, stream_orderings): + # XXX: We can't rely on `stream_ordering`/`instance_name` being correct + # at this point. We could be working with events that were previously + # persisted as an `outlier` with one `stream_ordering` but are now being + # persisted again and de-outliered and are being assigned a different + # `stream_ordering` here that won't end up being used. + # `_update_outliers_txn()` will fix this discrepancy (always use the + # `stream_ordering` from the first time it was persisted). event.internal_metadata.stream_ordering = stream event.internal_metadata.instance_name = self._instance_name @@ -470,11 +477,11 @@ async def _calculate_sliding_sync_table_changes( membership_infos_to_insert_membership_snapshots.append( # XXX: We don't use `SlidingSyncMembershipInfoWithEventPos` here # because we're sourcing the event from `events_and_contexts`, we - # can't rely on `stream_ordering`/`instance_name` being correct. We - # could be working with events that were previously persisted as an - # `outlier` with one `stream_ordering` but are now being persisted - # again and de-outliered and assigned a different `stream_ordering` - # that won't end up being used. Since we call + # can't rely on `stream_ordering`/`instance_name` being correct at + # this point. We could be working with events that were previously + # persisted as an `outlier` with one `stream_ordering` but are now + # being persisted again and de-outliered and assigned a different + # `stream_ordering` that won't end up being used. Since we call # `_calculate_sliding_sync_table_changes()` before # `_update_outliers_txn()` which fixes this discrepancy (always use # the `stream_ordering` from the first time it was persisted), we're @@ -591,11 +598,17 @@ async def _calculate_sliding_sync_table_changes( event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, ) ) - bump_stamp_to_fully_insert = ( - most_recent_bump_event_pos_results[1].stream - if most_recent_bump_event_pos_results is not None - else None - ) + if most_recent_bump_event_pos_results is not None: + _, new_bump_event_pos = most_recent_bump_event_pos_results + + # If we've just joined a remote room, then the last bump event may + # have been backfilled (and so have a negative stream ordering). + # These negative stream orderings can't sensibly be compared, so + # instead just leave it as `None` in the table and we will use their + # membership event position as the bump event position in the + # Sliding Sync API. + if new_bump_event_pos.stream > 0: + bump_stamp_to_fully_insert = new_bump_event_pos.stream current_state_ids_map = dict( await self.store.get_partial_filtered_current_state_ids( @@ -2123,31 +2136,26 @@ def _update_sliding_sync_tables_with_new_persisted_events_txn( if len(events_and_contexts) == 0: return - # We only update the sliding sync tables for non-backfilled events. - # - # Check if the first event is a backfilled event (with a negative - # `stream_ordering`). If one event is backfilled, we assume this whole batch was - # backfilled. - first_event_stream_ordering = events_and_contexts[0][ - 0 - ].internal_metadata.stream_ordering - # This should exist for persisted events - assert first_event_stream_ordering is not None - if first_event_stream_ordering < 0: - return - # Since the list is sorted ascending by `stream_ordering`, the last event should # have the highest `stream_ordering`. max_stream_ordering = events_and_contexts[-1][ 0 ].internal_metadata.stream_ordering + # `stream_ordering` should be assigned for persisted events + assert max_stream_ordering is not None + # Check if the event is a backfilled event (with a negative `stream_ordering`). + # If one event is backfilled, we assume this whole batch was backfilled. + if max_stream_ordering < 0: + # We only update the sliding sync tables for non-backfilled events. + return + max_bump_stamp = None for event, _ in reversed(events_and_contexts): # Sanity check that all events belong to the same room assert event.room_id == room_id if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES: - # This should exist for persisted events + # `stream_ordering` should be assigned for persisted events assert event.internal_metadata.stream_ordering is not None max_bump_stamp = event.internal_metadata.stream_ordering @@ -2156,11 +2164,6 @@ def _update_sliding_sync_tables_with_new_persisted_events_txn( # matching bump event which should have the highest `stream_ordering`. break - # We should have exited earlier if there were no events - assert ( - max_stream_ordering is not None - ), "Expected to have a stream_ordering if we have events" - # Handle updating the `sliding_sync_joined_rooms` table. # txn.execute( diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index dc747d7ac0a..83939d10b0a 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -41,6 +41,46 @@ class SlidingSyncStore(SQLBaseStore): + async def get_latest_bump_stamp_for_room( + self, + room_id: str, + ) -> Optional[int]: + """ + Get the `bump_stamp` for the room. + + The `bump_stamp` is the `stream_ordering` of the last event according to the + `bump_event_types`. This helps clients sort more readily without them needing to + pull in a bunch of the timeline to determine the last activity. + `bump_event_types` is a thing because for example, we don't want display name + changes to mark the room as unread and bump it to the top. For encrypted rooms, + we just have to consider any activity as a bump because we can't see the content + and the client has to figure it out for themselves. + + This should only be called where the server is participating + in the room (someone local is joined). + + Returns: + The `bump_stamp` for the room (which can be `None`). + """ + + return cast( + Optional[int], + await self.db_pool.simple_select_one_onecol( + table="sliding_sync_joined_rooms", + keyvalues={"room_id": room_id}, + retcol="bump_stamp", + # FIXME: This should be `False` once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked + # by https://github.com/element-hq/synapse/issues/17623) + # + # The should be `allow_none=False` in the future because event though + # `bump_stamp` itself can be `None`, we should have a row in the + # `sliding_sync_joined_rooms` table for any joined room. + allow_none=True, + ), + ) + async def persist_per_connection_state( self, user_id: str, diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index 621f46fff82..de80ad53cd8 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -106,6 +106,12 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: assert persist_events_store is not None self.persist_events_store = persist_events_store + persist_controller = self.hs.get_storage_controllers().persistence + assert persist_controller is not None + self.persist_controller = persist_controller + + self.state_handler = self.hs.get_state_handler() + def _get_sliding_sync_joined_rooms(self) -> Dict[str, _SlidingSyncJoinedRoomResult]: """ Return the rows from the `sliding_sync_joined_rooms` table. @@ -260,10 +266,8 @@ def _create_remote_invite_room_for_user( ) ) context = EventContext.for_outlier(self.hs.get_storage_controllers()) - persist_controller = self.hs.get_storage_controllers().persistence - assert persist_controller is not None persisted_event, _, _ = self.get_success( - persist_controller.persist_event(invite_event, context) + self.persist_controller.persist_event(invite_event, context) ) self._remote_invite_count += 1 @@ -316,10 +320,8 @@ def _retract_remote_invite_for_user( ) ) context = EventContext.for_outlier(self.hs.get_storage_controllers()) - persist_controller = self.hs.get_storage_controllers().persistence - assert persist_controller is not None persisted_event, _, _ = self.get_success( - persist_controller.persist_event(kick_event, context) + self.persist_controller.persist_event(kick_event, context) ) return persisted_event @@ -926,6 +928,201 @@ def test_joined_room_is_bumped(self) -> None: user2_snapshot, ) + def test_joined_room_bump_stamp_backfill(self) -> None: + """ + Test that `bump_stamp` ignores backfilled events, i.e. events with a + negative stream ordering. + """ + user1_id = self.register_user("user1", "pass") + _user1_tok = self.login(user1_id, "pass") + + # Create a remote room + creator = "@user:other" + room_id = "!foo:other" + room_version = RoomVersions.V10 + shared_kwargs = { + "room_id": room_id, + "room_version": room_version.identifier, + } + + create_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[], + type=EventTypes.Create, + state_key="", + content={ + # The `ROOM_CREATOR` field could be removed if we used a room + # version > 10 (in favor of relying on `sender`) + EventContentFields.ROOM_CREATOR: creator, + EventContentFields.ROOM_VERSION: room_version.identifier, + }, + sender=creator, + **shared_kwargs, + ) + ) + creator_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[create_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id], + type=EventTypes.Member, + state_key=creator, + content={"membership": Membership.JOIN}, + sender=creator, + **shared_kwargs, + ) + ) + room_name_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[creator_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id], + type=EventTypes.Name, + state_key="", + content={ + EventContentFields.ROOM_NAME: "my super duper room", + }, + sender=creator, + **shared_kwargs, + ) + ) + # We add a message event as a valid "bump type" + msg_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[room_name_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id], + type=EventTypes.Message, + content={"body": "foo", "msgtype": "m.text"}, + sender=creator, + **shared_kwargs, + ) + ) + invite_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[msg_tuple[0].event_id], + auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id], + type=EventTypes.Member, + state_key=user1_id, + content={"membership": Membership.INVITE}, + sender=creator, + **shared_kwargs, + ) + ) + + remote_events_and_contexts = [ + create_tuple, + creator_tuple, + room_name_tuple, + msg_tuple, + invite_tuple, + ] + + # Ensure the local HS knows the room version + self.get_success(self.store.store_room(room_id, creator, False, room_version)) + + # Persist these events as backfilled events. + for event, context in remote_events_and_contexts: + self.get_success( + self.persist_controller.persist_event(event, context, backfilled=True) + ) + + # Now we join the local user to the room. We want to make this feel as close to + # the real `process_remote_join()` as possible but we'd like to avoid some of + # the auth checks that would be done in the real code. + # + # FIXME: The test was originally written using this less-real + # `persist_event(...)` shortcut but it would be nice to use the real remote join + # process in a `FederatingHomeserverTestCase`. + flawed_join_tuple = self.get_success( + create_event( + self.hs, + prev_event_ids=[invite_tuple[0].event_id], + # This doesn't work correctly to create an `EventContext` that includes + # both of these state events. I assume it's because we're working on our + # local homeserver which has the remote state set as `outlier`. We have + # to create our own EventContext below to get this right. + auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id], + type=EventTypes.Member, + state_key=user1_id, + content={"membership": Membership.JOIN}, + sender=user1_id, + **shared_kwargs, + ) + ) + # We have to create our own context to get the state set correctly. If we use + # the `EventContext` from the `flawed_join_tuple`, the `current_state_events` + # table will only have the join event in it which should never happen in our + # real server. + join_event = flawed_join_tuple[0] + join_context = self.get_success( + self.state_handler.compute_event_context( + join_event, + state_ids_before_event={ + (e.type, e.state_key): e.event_id + for e in [create_tuple[0], invite_tuple[0], room_name_tuple[0]] + }, + partial_state=False, + ) + ) + join_event, _join_event_pos, _room_token = self.get_success( + self.persist_controller.persist_event(join_event, join_context) + ) + + # Make sure the tables are populated correctly + sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() + self.assertIncludes( + set(sliding_sync_joined_rooms_results.keys()), + {room_id}, + exact=True, + ) + self.assertEqual( + sliding_sync_joined_rooms_results[room_id], + _SlidingSyncJoinedRoomResult( + room_id=room_id, + # This should be the last event in the room (the join membership) + event_stream_ordering=join_event.internal_metadata.stream_ordering, + # Since all of the bump events are backfilled, the `bump_stamp` should + # still be `None`. (and we will fallback to the users membership event + # position in the Sliding Sync API) + bump_stamp=None, + room_type=None, + # We still pick up state of the room even if it's backfilled + room_name="my super duper room", + is_encrypted=False, + tombstone_successor_room_id=None, + ), + ) + + sliding_sync_membership_snapshots_results = ( + self._get_sliding_sync_membership_snapshots() + ) + self.assertIncludes( + set(sliding_sync_membership_snapshots_results.keys()), + { + (room_id, user1_id), + }, + exact=True, + ) + self.assertEqual( + sliding_sync_membership_snapshots_results.get((room_id, user1_id)), + _SlidingSyncMembershipSnapshotResult( + room_id=room_id, + user_id=user1_id, + sender=user1_id, + membership_event_id=join_event.event_id, + membership=Membership.JOIN, + event_stream_ordering=join_event.internal_metadata.stream_ordering, + has_known_state=True, + room_type=None, + room_name="my super duper room", + is_encrypted=False, + tombstone_successor_room_id=None, + ), + ) + @parameterized.expand( # Test both an insert an upsert into the # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` to exercise @@ -1036,11 +1233,9 @@ def test_joined_room_outlier_and_deoutlier( context = self.get_success(unpersisted_context.persist(event)) events_to_persist.append((event, context)) - persist_controller = self.hs.get_storage_controllers().persistence - assert persist_controller is not None for event, context in events_to_persist: self.get_success( - persist_controller.persist_event( + self.persist_controller.persist_event( event, context, )