Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(WebSocket): handle OSError upon send() + fix max_receive_queue == 0 #2324

Merged
merged 10 commits into from
Sep 17, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pip-log.txt
.ecosystem
.tox
.pytest_cache
downloaded_files/
geckodriver.log
htmlcov
nosetests.xml
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ For your convenience, wheels containing pre-compiled binaries are available
from PyPI for the majority of common platforms. Even if a binary build for your
platform of choice is not available, ``pip`` will pick a pure-Python wheel.
You can also cythonize Falcon for your environment; see our
`Installation docs <https://falcon.readthedocs.io/en/stable/user/install.html>`__.
`Installation docs <https://falcon.readthedocs.io/en/stable/user/install.html>`__
for more information on this and other advanced options.

Dependencies
Expand Down
16 changes: 16 additions & 0 deletions docs/_newsfragments/2292.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Falcon will now raise an instance of
:class:`~falcon.errors.WebSocketDisconnected` from the :class:`OSError` that
the ASGI server signals in the case of a disconnected client (as per
the `ASGI HTTP & WebSocket protocol
<https://asgi.readthedocs.io/en/latest/specs/www.html#id2>`__ version ``2.4``).
It is worth noting though that Falcon's
:ref:`built-in receive buffer <ws_lost_connection>` normally detects the
``websocket.disconnect`` event itself prior the potentially failing attempt to
``send()``.

Disabling this built-in receive buffer (by setting
:attr:`~falcon.asgi.WebSocketOptions.max_receive_queue` to ``0``) was also
found to interfere with receiving ASGI WebSocket messages in an unexpected
way. The issue has been fixed so that setting this option to ``0`` now properly
bypasses the buffer altogether, and extensive test coverage has been added for
validating this scenario.
48 changes: 26 additions & 22 deletions docs/api/websocket.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,32 +112,36 @@ Lost Connections
----------------

When the app attempts to receive a message from the client, the ASGI server
emits a `disconnect` event if the connection has been lost for any reason. Falcon
surfaces this event by raising an instance of :class:`~.WebSocketDisconnected`
to the caller.

On the other hand, the ASGI spec requires the ASGI server to silently consume
messages sent by the app after the connection has been lost (i.e., it should
not be considered an error). Therefore, an endpoint that primarily streams
outbound events to the client might continue consuming resources unnecessarily
for some time after the connection is lost.
emits a ``disconnect`` event if the connection has been lost for any
reason. Falcon surfaces this event by raising an instance of
:class:`~.WebSocketDisconnected` to the caller.

On the other hand, the ASGI spec previously required the ASGI server to
silently consume messages sent by the app after the connection has been lost
(i.e., it should not be considered an error). Therefore, an endpoint that
primarily streams outbound events to the client could continue consuming
resources unnecessarily for some time after the connection is lost.
This aspect has been rectified in the ASGI HTTP spec version ``2.4``,
and calling ``send()`` on a closed connection should now raise an
error. Unfortunately, not all ASGI servers have adopted this new behavior
uniformly yet.

As a workaround, Falcon implements a small incoming message queue that is used
to detect a lost connection and then raise an instance of
:class:`~.WebSocketDisconnected` to the caller the next time it attempts to send
a message.

This workaround is only necessary when the app itself does not consume messages
from the client often enough to quickly detect when the connection is lost.
Otherwise, Falcon's receive queue can be disabled for a slight performance boost
by setting :attr:`~falcon.asgi.WebSocketOptions.max_receive_queue` to ``0`` via
:class:`~.WebSocketDisconnected` to the caller the next time it attempts to
send a message.
If your ASGI server of choice adheres to the spec version ``2.4``, this receive
queue can be safely disabled for a slight performance boost by setting
:attr:`~falcon.asgi.WebSocketOptions.max_receive_queue` to ``0`` via
:attr:`~falcon.asgi.App.ws_options`.

Note also that some ASGI server implementations do not strictly follow the ASGI
spec in this regard, and in fact will raise an error when the app attempts to
send a message after the client disconnects. If testing reveals this to be the
case for your ASGI server of choice, Falcon's own receive queue can be safely
disabled.
(We may revise this setting, and disable the queue by default in the future if
our testing indicates that all major ASGI servers have caught up with the
spec.)

Furthermore, even on non-compliant or older ASGI servers, this workaround is
only necessary when the app itself does not consume messages from the client
often enough to quickly detect when the connection is lost.
Otherwise, Falcon's receive queue can also be disabled as described above.

.. _ws_error_handling:

Expand Down
4 changes: 4 additions & 0 deletions e2e-tests/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
def create_app() -> falcon.asgi.App:
app = falcon.asgi.App()

# NOTE(vytas): E2E tests run Uvicorn, and the latest versions support ASGI
# HTTP/WSspec ver 2.4, so buffering on our side should not be needed.
app.ws_options.max_receive_queue = 0

hub = Hub()
app.add_route('/ping', Pong())
app.add_route('/sse', Events(hub))
Expand Down
48 changes: 45 additions & 3 deletions falcon/asgi/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import collections
from enum import auto
from enum import Enum
import re
from typing import Any, Deque, Dict, Iterable, Mapping, Optional, Tuple, Union

from falcon import errors
Expand All @@ -28,6 +29,9 @@
CLOSED = auto()


_CLIENT_DISCONNECTED_CAUSE = re.compile(r'received (\d\d\d\d)')


class WebSocket:
"""Represents a single WebSocket connection with a client."""

Expand All @@ -47,6 +51,8 @@
'subprotocols',
)

_asgi_receive: AsgiReceive
_asgi_send: AsgiSend
_state: _WebSocketState
_close_code: Optional[int]
subprotocols: Tuple[str, ...]
Expand Down Expand Up @@ -81,7 +87,12 @@
# event via one of their receive() calls, and there is no
# need for the added overhead.
self._buffered_receiver = _BufferedReceiver(receive, max_receive_queue)
self._asgi_receive = self._buffered_receiver.receive
if max_receive_queue > 0:
self._asgi_receive = self._buffered_receiver.receive

Check warning on line 91 in falcon/asgi/ws.py

View check run for this annotation

Codecov / codecov/patch

falcon/asgi/ws.py#L91

Added line #L91 was not covered by tests
else:
# NOTE(vytas): Pass through the receive callable bypassing the
# buffered receiver in the case max_receive_queue is set to 0.
self._asgi_receive = receive

Check warning on line 95 in falcon/asgi/ws.py

View check run for this annotation

Codecov / codecov/patch

falcon/asgi/ws.py#L95

Added line #L95 was not covered by tests
self._asgi_send = send

mh_text = media_handlers[WebSocketPayloadType.TEXT]
Expand Down Expand Up @@ -468,6 +479,8 @@
if self._buffered_receiver.client_disconnected:
self._state = _WebSocketState.CLOSED
self._close_code = self._buffered_receiver.client_disconnected_code

if self._state == _WebSocketState.CLOSED:
raise errors.WebSocketDisconnected(self._close_code)

try:
Expand All @@ -483,7 +496,16 @@

translated_ex = self._translate_webserver_error(ex)
if translated_ex:
raise translated_ex
# NOTE(vytas): Mark WebSocket as closed if we catch an error
# upon sending. This is useful when not using the buffered
# receiver, and not receiving anything at the given moment.
self._state = _WebSocketState.CLOSED

Check warning on line 502 in falcon/asgi/ws.py

View check run for this annotation

Codecov / codecov/patch

falcon/asgi/ws.py#L502

Added line #L502 was not covered by tests
if isinstance(translated_ex, errors.WebSocketDisconnected):
self._close_code = translated_ex.code

Check warning on line 504 in falcon/asgi/ws.py

View check run for this annotation

Codecov / codecov/patch

falcon/asgi/ws.py#L504

Added line #L504 was not covered by tests

# NOTE(vytas): Use the raise from form in order to preserve
# the traceback.
raise translated_ex from ex

Check warning on line 508 in falcon/asgi/ws.py

View check run for this annotation

Codecov / codecov/patch

falcon/asgi/ws.py#L508

Added line #L508 was not covered by tests

# NOTE(kgriffs): Re-raise other errors directly so that we don't
# obscure the traceback.
Expand Down Expand Up @@ -529,6 +551,25 @@
'WebSocket subprotocol must be from the list sent by the client'
)

# NOTE(vytas): Per ASGI HTTP & WebSocket spec v2.4:
# If send() is called on a closed connection the server should raise
# a server-specific subclass of IOError.
# NOTE(vytas): Uvicorn 0.30.6 seems to conform to the spec only when
# using the wsproto stack, it then raises an instance of
# uvicorn.protocols.utils.ClientDisconnected.
if isinstance(ex, OSError):
close_code = None

Check warning on line 561 in falcon/asgi/ws.py

View check run for this annotation

Codecov / codecov/patch

falcon/asgi/ws.py#L561

Added line #L561 was not covered by tests

# NOTE(vytas): If using the "websockets" backend, Uvicorn raises
# and instance of OSError from a websockets exception like this:
# "received 1001 (going away); then sent 1001 (going away)"
if ex.__cause__:
vytas7 marked this conversation as resolved.
Show resolved Hide resolved
match = _CLIENT_DISCONNECTED_CAUSE.match(str(ex.__cause__))

Check warning on line 567 in falcon/asgi/ws.py

View check run for this annotation

Codecov / codecov/patch

falcon/asgi/ws.py#L567

Added line #L567 was not covered by tests
if match:
close_code = int(match.group(1))

Check warning on line 569 in falcon/asgi/ws.py

View check run for this annotation

Codecov / codecov/patch

falcon/asgi/ws.py#L569

Added line #L569 was not covered by tests

return errors.WebSocketDisconnected(close_code)

Check warning on line 571 in falcon/asgi/ws.py

View check run for this annotation

Codecov / codecov/patch

falcon/asgi/ws.py#L571

Added line #L571 was not covered by tests

return None


Expand Down Expand Up @@ -679,7 +720,8 @@
self.client_disconnected_code = None

def start(self) -> None:
if self._pump_task is None:
# NOTE(vytas): Do not start anything if buffering is disabled.
if self._pump_task is None and self._max_queue > 0:
self._pump_task = asyncio.create_task(self._pump())

async def stop(self) -> None:
Expand Down
8 changes: 4 additions & 4 deletions falcon/testing/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,10 +785,10 @@
else:
assert self.closed

# NOTE(kgriffs): According to the ASGI spec, we are
# supposed to just silently eat events once the
# socket is disconnected.
pass
# NOTE(vytas): Tweaked in Falcon 4.0: we now simulate ASGI
# WebSocket protocol 2.4+, raising an instance of OSError upon
# send if the client has already disconnected.
raise falcon_errors.WebSocketDisconnected(self._close_code)

Check warning on line 791 in falcon/testing/helpers.py

View check run for this annotation

Codecov / codecov/patch

falcon/testing/helpers.py#L791

Added line #L791 was not covered by tests

# NOTE(kgriffs): Give whatever is waiting on the handshake or a
# collected data/text event a chance to progress.
Expand Down
24 changes: 24 additions & 0 deletions tests/asgi/_asgi_test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,29 @@ async def on_post(self, req, resp):
resp.status = falcon.HTTP_403


class WSOptions:
_SUPPORTED_KEYS = frozenset(
{'default_close_reasons', 'error_close_code', 'max_receive_queue'}
)

def __init__(self, ws_options):
self._ws_options = ws_options

async def on_get(self, req, resp):
resp.media = {
key: getattr(self._ws_options, key) for key in self._SUPPORTED_KEYS
}

async def on_patch(self, req, resp):
update = await req.get_media()
for key, value in update.items():
if key not in self._SUPPORTED_KEYS:
raise falcon.HTTPInvalidParam('unsupported option', key)
setattr(self._ws_options, key, value)

resp.status = falcon.HTTP_NO_CONTENT


def create_app():
app = falcon.asgi.App()
bucket = Bucket()
Expand All @@ -276,6 +299,7 @@ def create_app():
app.add_route('/forms', Multipart())
app.add_route('/jars', TestJar())
app.add_route('/feeds/{feed_id}', Feed())
app.add_route('/wsoptions', WSOptions(app.ws_options))

app.add_middleware(lifespan_handler)

Expand Down
15 changes: 14 additions & 1 deletion tests/asgi/test_asgi_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,20 @@ async def emitter():
class TestWebSocket:
@pytest.mark.parametrize('explicit_close', [True, False])
@pytest.mark.parametrize('close_code', [None, 4321])
async def test_hello(self, explicit_close, close_code, server_url_events_ws):
@pytest.mark.parametrize('max_receive_queue', [0, 4, 17])
async def test_hello(
self,
explicit_close,
close_code,
max_receive_queue,
server_base_url,
server_url_events_ws,
):
resp = requests.patch(
server_base_url + 'wsoptions', json={'max_receive_queue': max_receive_queue}
)
resp.raise_for_status()

echo_expected = 'Check 1 - \U0001f600'

extra_headers = {'X-Command': 'recv'}
Expand Down
Loading