Skip to content

Sliding sync: use new DB tables #17630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
74bec29
Split out _rewind_current_membership_to_token function
erikjohnston Aug 29, 2024
58071bc
Split out fetching of newly joined/left rooms
erikjohnston Aug 29, 2024
c9a9156
Add DB functions
erikjohnston Aug 29, 2024
e2c0a4b
Use new tables
erikjohnston Aug 29, 2024
86a0730
Add trace
erikjohnston Aug 29, 2024
c038ff9
Proper join
erikjohnston Aug 29, 2024
a278a67
Newsfile
erikjohnston Aug 29, 2024
a027397
Newsfile
erikjohnston Aug 29, 2024
676754d
WIP
erikjohnston Aug 29, 2024
bc4cb1f
Handle state resets in rooms
erikjohnston Aug 29, 2024
bfd36c1
Apply suggestions from code review
erikjohnston Aug 30, 2024
ed4ce95
Update comments
erikjohnston Aug 30, 2024
2980422
Apply suggestions from code review
erikjohnston Aug 30, 2024
6c4ad32
Faster have_finished_sliding_sync_background_jobs
erikjohnston Aug 30, 2024
5d6386a
Use dm_room_ids
erikjohnston Aug 30, 2024
acb57ee
Use filter_membership_for_sync
erikjohnston Aug 30, 2024
82f58bf
Factor out _filter_relevant_room_to_send
erikjohnston Aug 30, 2024
e76954b
Parameterize tests
erikjohnston Aug 30, 2024
f78ab68
Add cache
erikjohnston Aug 30, 2024
e923a8d
Get encryption state at the time
erikjohnston Aug 30, 2024
3c9d994
Comments
erikjohnston Sep 1, 2024
ed4a158
More cache invalidation
erikjohnston Sep 1, 2024
180d176
Merge branch 'erikj/ss_room_list_split' into erikj/ss_new_tables
erikjohnston Sep 1, 2024
b3676c4
Merge remote-tracking branch 'origin/develop' into erikj/ss_new_tables
erikjohnston Sep 1, 2024
b88df94
Comment
erikjohnston Sep 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17630.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use new database tables for sliding sync.
956 changes: 783 additions & 173 deletions synapse/handlers/sliding_sync/room_lists.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache(
"_get_rooms_for_local_user_where_membership_is_inner", (user_id,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", (user_id,)
)

# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
Expand Down Expand Up @@ -157,6 +160,7 @@ def _invalidate_state_caches_all(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)

def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]
Expand Down
21 changes: 20 additions & 1 deletion synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Connection, Cursor
from synapse.types import JsonDict
from synapse.types import JsonDict, StrCollection
from synapse.util import Clock, json_encoder

from . import engines
Expand Down Expand Up @@ -487,6 +487,25 @@ async def has_completed_background_update(self, update_name: str) -> bool:

return not update_exists

async def have_completed_background_updates(
self, update_names: StrCollection
) -> bool:
"""Return the name of background updates that have not yet been
completed"""
if self._all_done:
return True

rows = await self.db_pool.simple_select_many_batch(
table="background_updates",
column="update_name",
iterable=update_names,
retcols=("update_name",),
desc="get_uncompleted_background_updates",
)

# If we find any rows then we've not completed the update.
return not bool(rows)

async def do_next_background_update(self, sleep: bool = True) -> bool:
"""Does some amount of work on the next queued background update

Expand Down
11 changes: 11 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -2272,6 +2272,17 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:

return len(memberships_to_update_rows)

async def have_finished_sliding_sync_background_jobs(self) -> bool:
"""Return if its safe to use the sliding sync membership tables."""

return await self.db_pool.updates.have_completed_background_updates(
(
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
)
)


def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,
Expand Down
55 changes: 54 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser
from synapse.storage.roommember import (
MemberSummary,
ProfileInfo,
RoomsForUser,
RoomsForUserSlidingSync,
)
from synapse.types import (
JsonDict,
PersistedEventPosition,
Expand Down Expand Up @@ -1377,6 +1382,54 @@ async def update_room_forgetter_stream_pos(self, stream_id: int) -> None:
desc="room_forgetter_stream_pos",
)

@cached(iterable=True, max_entries=10000)
async def get_sliding_sync_rooms_for_user(
self,
user_id: str,
) -> Mapping[str, RoomsForUserSlidingSync]:
"""Get all the rooms for a user to handle a sliding sync request.

Ignores forgotten rooms and rooms that the user has been kicked from.

Returns:
Map from room ID to membership info
"""

def get_sliding_sync_rooms_for_user_txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
Copy link
Contributor

@MadLittleMods MadLittleMods Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're simply fetching out room_version_id just because that's what's in a RoomsForUser. We don't use it anywhere in the Sliding Sync code (as far as I can tell).

  • We can remove it from RoomsForUserSlidingSync
  • We should just use a different data type. Probably the same as RoomsForUser but without room_version_id. We could have RoomsForUserSlidingSync and RoomsForUserSlidingSyncWithState for example

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be addressed in a future PR. Probably would be good to create an issue for this one

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah. So, I had the same thought about removing it, but it turns out that sync v2 specifically checks that the room version is known and so we should probably do something similar in sliding sync as well.

m.event_instance_name, m.event_stream_ordering,
COALESCE(j.room_type, m.room_type),
COALESCE(j.is_encrypted, m.is_encrypted)
FROM sliding_sync_membership_snapshots AS m
INNER JOIN rooms AS r USING (room_id)
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE user_id = ?
AND m.forgotten = 0
"""
txn.execute(sql, (user_id,))
return {
row[0]: RoomsForUserSlidingSync(
room_id=row[0],
sender=row[1],
membership=row[2],
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
room_type=row[7],
is_encrypted=row[8],
)
for row in txn
}

return await self.db_pool.runInteraction(
"get_sliding_sync_rooms_for_user",
get_sliding_sync_rooms_for_user_txn,
)


class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(
Expand Down
13 changes: 13 additions & 0 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ class RoomsForUser:
room_version_id: str


@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True)
class RoomsForUserSlidingSync:
room_id: str
sender: Optional[str]
membership: str
event_id: Optional[str]
event_pos: PersistedEventPosition
room_version_id: str

room_type: Optional[str]
is_encrypted: bool


@attr.s(slots=True, frozen=True, weakref_slot=False, auto_attribs=True)
class GetRoomsForUserWithStreamOrdering:
room_id: str
Expand Down
16 changes: 15 additions & 1 deletion tests/rest/client/sliding_sync/test_connection_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#
import logging

from parameterized import parameterized
from parameterized import parameterized, parameterized_class

from twisted.test.proto_helpers import MemoryReactor

Expand All @@ -28,6 +28,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
"""
Test connection tracking in the Sliding Sync API.
Expand All @@ -44,6 +56,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.storage_controllers = hs.get_storage_controllers()

super().prepare(reactor, clock, hs)

def test_rooms_required_state_incremental_sync_LIVE(self) -> None:
"""Test that we only get state updates in incremental sync for rooms
we've already seen (LIVE).
Expand Down
16 changes: 16 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#
import logging

from parameterized import parameterized_class

from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
Expand All @@ -28,6 +30,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
"""Tests for the account_data sliding sync extension"""

Expand All @@ -43,6 +57,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.account_data_handler = hs.get_account_data_handler()

super().prepare(reactor, clock, hs)

def test_no_data_initial_sync(self) -> None:
"""
Test that enabling the account_data extension works during an intitial sync,
Expand Down
16 changes: 16 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_e2ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#
import logging

from parameterized import parameterized_class

from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
Expand All @@ -27,6 +29,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
"""Tests for the e2ee sliding sync extension"""

Expand All @@ -42,6 +56,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.e2e_keys_handler = hs.get_e2e_keys_handler()

super().prepare(reactor, clock, hs)

def test_no_data_initial_sync(self) -> None:
"""
Test that enabling e2ee extension works during an intitial sync, even if there
Expand Down
16 changes: 16 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#
import logging

from parameterized import parameterized_class

from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
Expand All @@ -28,6 +30,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
"""Tests for the receipts sliding sync extension"""

Expand All @@ -42,6 +56,8 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main

super().prepare(reactor, clock, hs)

def test_no_data_initial_sync(self) -> None:
"""
Test that enabling the receipts extension works during an intitial sync,
Expand Down
15 changes: 15 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_to_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import logging
from typing import List

from parameterized import parameterized_class

from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
Expand All @@ -28,6 +30,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
"""Tests for the to-device sliding sync extension"""

Expand All @@ -40,6 +54,7 @@ class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):

def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
super().prepare(reactor, clock, hs)

def _assert_to_device_response(
self, response_body: JsonDict, expected_messages: List[JsonDict]
Expand Down
16 changes: 16 additions & 0 deletions tests/rest/client/sliding_sync/test_extension_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#
import logging

from parameterized import parameterized_class

from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
Expand All @@ -28,6 +30,18 @@
logger = logging.getLogger(__name__)


# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
@parameterized_class(
("use_new_tables",),
[
(True,),
(False,),
],
class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{'new' if params_dict['use_new_tables'] else 'fallback'}",
)
class SlidingSyncTypingExtensionTestCase(SlidingSyncBase):
"""Tests for the typing notification sliding sync extension"""

Expand All @@ -41,6 +55,8 @@ class SlidingSyncTypingExtensionTestCase(SlidingSyncBase):
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main

super().prepare(reactor, clock, hs)

def test_no_data_initial_sync(self) -> None:
"""
Test that enabling the typing extension works during an intitial sync,
Expand Down
Loading
Loading