Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up sliding sync by computing extensions in parallel #17884

Merged
merged 6 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17884.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Minor speed-up of sliding sync by computing extensions results in parallel.
39 changes: 28 additions & 11 deletions synapse/handlers/sliding_sync/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@
SlidingSyncConfig,
SlidingSyncResult,
)
from synapse.util.async_helpers import concurrently_execute
from synapse.util.async_helpers import (
concurrently_execute,
gather_optional_coroutines,
)

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -97,26 +100,26 @@ async def get_extensions_response(
if sync_config.extensions is None:
return SlidingSyncResult.Extensions()

to_device_response = None
to_device_coro = None
if sync_config.extensions.to_device is not None:
to_device_response = await self.get_to_device_extension_response(
to_device_coro = self.get_to_device_extension_response(
sync_config=sync_config,
to_device_request=sync_config.extensions.to_device,
to_token=to_token,
)

e2ee_response = None
e2ee_coro = None
if sync_config.extensions.e2ee is not None:
e2ee_response = await self.get_e2ee_extension_response(
e2ee_coro = self.get_e2ee_extension_response(
sync_config=sync_config,
e2ee_request=sync_config.extensions.e2ee,
to_token=to_token,
from_token=from_token,
)

account_data_response = None
account_data_coro = None
if sync_config.extensions.account_data is not None:
account_data_response = await self.get_account_data_extension_response(
account_data_coro = self.get_account_data_extension_response(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
Expand All @@ -127,9 +130,9 @@ async def get_extensions_response(
from_token=from_token,
)

receipts_response = None
receipts_coro = None
if sync_config.extensions.receipts is not None:
receipts_response = await self.get_receipts_extension_response(
receipts_coro = self.get_receipts_extension_response(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
new_connection_state=new_connection_state,
Expand All @@ -141,9 +144,9 @@ async def get_extensions_response(
from_token=from_token,
)

typing_response = None
typing_coro = None
if sync_config.extensions.typing is not None:
typing_response = await self.get_typing_extension_response(
typing_coro = self.get_typing_extension_response(
sync_config=sync_config,
actual_lists=actual_lists,
actual_room_ids=actual_room_ids,
Expand All @@ -153,6 +156,20 @@ async def get_extensions_response(
from_token=from_token,
)

(
to_device_response,
e2ee_response,
account_data_response,
receipts_response,
typing_response,
) = await gather_optional_coroutines(
to_device_coro,
e2ee_coro,
account_data_coro,
receipts_coro,
typing_coro,
)

return SlidingSyncResult.Extensions(
to_device=to_device_response,
e2ee=e2ee_response,
Expand Down
40 changes: 40 additions & 0 deletions synapse/logging/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Optional,
Expand Down Expand Up @@ -850,6 +851,45 @@ def run_in_background(
return d


def run_coroutine_in_background(
coroutine: typing.Coroutine[Any, Any, R],
) -> "defer.Deferred[R]":
"""Run the coroutine, ensuring that the current context is restored after
return from the function, and that the sentinel context is set once the
deferred returned by the function completes.

Useful for wrapping coroutines that you don't yield or await on (for
instance because you want to pass it to deferred.gatherResults()).

This is a special case of `run_in_background` where we can accept a
coroutine directly rather than a function. We can do this because coroutines
do not run until called, and so calling an async function without awaiting
cannot change the log contexts.
"""

current = current_context()
d = defer.ensureDeferred(coroutine)

# The function may have reset the context before returning, so
# we need to restore it now.
ctx = set_current_context(current)

# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
d.addBoth(_set_context_cb, ctx)
return d


T = TypeVar("T")


Expand Down
110 changes: 109 additions & 1 deletion synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
)

import attr
from typing_extensions import Concatenate, Literal, ParamSpec
from typing_extensions import Concatenate, Literal, ParamSpec, Unpack

from twisted.internet import defer
from twisted.internet.defer import CancelledError
Expand All @@ -61,6 +61,7 @@
from synapse.logging.context import (
PreserveLoggingContext,
make_deferred_yieldable,
run_coroutine_in_background,
run_in_background,
)
from synapse.util import Clock
Expand Down Expand Up @@ -344,6 +345,7 @@ async def yieldable_gather_results_delaying_cancellation(
T2 = TypeVar("T2")
T3 = TypeVar("T3")
T4 = TypeVar("T4")
T5 = TypeVar("T5")


@overload
Expand Down Expand Up @@ -402,6 +404,112 @@ def gather_results( # type: ignore[misc]
return deferred.addCallback(tuple)


@overload
async def gather_optional_coroutines(
*coroutines: Unpack[Tuple[Optional[Coroutine[Any, Any, T1]]]],
) -> Tuple[Optional[T1]]: ...


@overload
async def gather_optional_coroutines(
*coroutines: Unpack[
Tuple[
Optional[Coroutine[Any, Any, T1]],
Optional[Coroutine[Any, Any, T2]],
]
],
) -> Tuple[Optional[T1], Optional[T2]]: ...


@overload
async def gather_optional_coroutines(
*coroutines: Unpack[
Tuple[
Optional[Coroutine[Any, Any, T1]],
Optional[Coroutine[Any, Any, T2]],
Optional[Coroutine[Any, Any, T3]],
]
],
) -> Tuple[Optional[T1], Optional[T2], Optional[T3]]: ...


@overload
async def gather_optional_coroutines(
*coroutines: Unpack[
Tuple[
Optional[Coroutine[Any, Any, T1]],
Optional[Coroutine[Any, Any, T2]],
Optional[Coroutine[Any, Any, T3]],
Optional[Coroutine[Any, Any, T4]],
]
],
) -> Tuple[Optional[T1], Optional[T2], Optional[T3], Optional[T4]]: ...


@overload
async def gather_optional_coroutines(
*coroutines: Unpack[
Tuple[
Optional[Coroutine[Any, Any, T1]],
Optional[Coroutine[Any, Any, T2]],
Optional[Coroutine[Any, Any, T3]],
Optional[Coroutine[Any, Any, T4]],
Optional[Coroutine[Any, Any, T5]],
]
],
) -> Tuple[Optional[T1], Optional[T2], Optional[T3], Optional[T4], Optional[T5]]: ...


async def gather_optional_coroutines(
*coroutines: Unpack[Tuple[Optional[Coroutine[Any, Any, T1]], ...]],
) -> Tuple[Optional[T1], ...]:
Comment on lines +463 to +465
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised this sort of thing doesn't exist in the API? Like Promise.all(...) in JavaScript.

Is it because we can't use asyncio yet?

If it's just a feature-difference on how None is handled, can we change them from optional coroutines to Promise.resolve(None) to wrap it in a coroutine (however best to do that in Python) so we're always working with a coroutine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we need to do this manually (rather than using built-ins), is that we need to correctly handle log contexts.

As a quick summary: basically whenever we "fork off" execution of some async code we need to use run_in_background (or run_as_background_process if we're not going to later wait on the result), and when we "join" them again we need to use make_deferred_yieldable. This is basically what gather_optional_coroutines is doing: spawning the coroutines as background tasks, and then calling make_deferred_yieldable when waiting on them all to complete

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This context would be useful to read in the docstrings. I see log contexts are mentioned but none of the consequences or why we need to or why we care is included.

"""Helper function that allows waiting on multiple coroutines at once.

The return value is a tuple of the return values of the coroutines in order.

If a `None` is passed instead of a coroutine, it will be ignored and a None
is returned in the tuple.

Note: For typechecking we need to have an explicit overload for each
distinct number of coroutines passed in. If you see type problems, it's
likely because you're using many arguments and you need to add a new
overload above.
"""

try:
results = await make_deferred_yieldable(
defer.gatherResults(
[
run_coroutine_in_background(coroutine)
for coroutine in coroutines
if coroutine is not None
],
consumeErrors=True,
)
)

results_iter = iter(results)
return tuple(
next(results_iter) if coroutine is not None else None
for coroutine in coroutines
)
except defer.FirstError as dfe:
# unwrap the error from defer.gatherResults.

# The raised exception's traceback only includes func() etc if
# the 'await' happens before the exception is thrown - ie if the failure
# happens *asynchronously* - otherwise Twisted throws away the traceback as it
# could be large.
#
# We could maybe reconstruct a fake traceback from Failure.frames. Or maybe
# we could throw Twisted into the fires of Mordor.

# suppress exception chaining, because the FirstError doesn't tell us anything
# very interesting.
assert isinstance(dfe.subFailure.value, BaseException)
raise dfe.subFailure.value from None


@attr.s(slots=True, auto_attribs=True)
class _LinearizerEntry:
# The number of things executing.
Expand Down
Loading
Loading