-
Notifications
You must be signed in to change notification settings - Fork 106
[FIX] Issue #709: do not enqueue all responses #713
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
b3ce986
444f14b
24df937
d2f9dfc
c4b35c1
f3597bc
88b0428
3a53ffc
8617377
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
"""Memory usage of Websocket clients.""" | ||
|
||
from __future__ import annotations | ||
|
||
import asyncio | ||
from unittest import TestCase | ||
|
||
from xrpl.asyncio.clients import AsyncWebsocketClient | ||
from xrpl.clients.websocket_client import WebsocketClient | ||
from xrpl.models.currencies import XRP, IssuedCurrency | ||
from xrpl.models.requests import BookOffers | ||
|
||
try: | ||
from unittest import IsolatedAsyncioTestCase | ||
except ImportError: | ||
from aiounittest import AsyncTestCase as IsolatedAsyncioTestCase # type: ignore | ||
|
||
|
||
class TestAsyncWebsocketClient(IsolatedAsyncioTestCase): | ||
"""Memory usage of async-websocket client""" | ||
|
||
async def test_msg_queue_async_websocket_client( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test can still use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current version of the code uses but I don't like this version. Due to the use of Ideally, here is what I'd like to do: The
I'm trying to accomodate what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about using subscriptions? Can those stay in the queue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not able to test the What am I missing? The structure of the code is similar to the tests here
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you're connecting to a standalone node, you also need to close ledgers independently. A standalone node won't do that automatically for you. That's why there's the additional Alternatively, you can add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might also be worth considering rewriting the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it, thanks.
are you proposing that we use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, fair. I was just thinking about commands that would legitimately grow the queue. |
||
self: TestAsyncWebsocketClient, | ||
) -> None: | ||
"""Test the rate of growth of the Message queue in async_websocket_client under | ||
persistent load. Admittedly, this is not a precise measure, rather its a proxy | ||
to measure the memory footprint of the client | ||
""" | ||
async with AsyncWebsocketClient("wss://s1.ripple.com") as client: | ||
for _ in range(5): | ||
await client.request( | ||
BookOffers( | ||
ledger_index="current", | ||
taker_gets=XRP(), | ||
taker_pays=IssuedCurrency( | ||
currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" | ||
), | ||
limit=500, | ||
) | ||
) | ||
|
||
await client.request( | ||
BookOffers( | ||
ledger_index="current", | ||
taker_gets=IssuedCurrency( | ||
currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" | ||
), | ||
taker_pays=XRP(), | ||
limit=500, | ||
) | ||
) | ||
|
||
self.assertEqual(client._messages.qsize(), 0) | ||
await asyncio.sleep(2) | ||
|
||
|
||
class TestSyncWebsocketClient(TestCase): | ||
"""Memory usage of sync-websocket client""" | ||
|
||
def test_msg_queue_sync_websocket_client( | ||
self: TestSyncWebsocketClient, | ||
) -> None: | ||
"""Test the rate of growth of the Message queue in sync_websocket_client under | ||
persistent load. Admittedly, this is not a precise measure, rather its a proxy | ||
to measure the memory footprint of the client | ||
""" | ||
with WebsocketClient("wss://s1.ripple.com") as client: | ||
for _ in range(5): | ||
client.request( | ||
BookOffers( | ||
ledger_index="current", | ||
taker_gets=XRP(), | ||
taker_pays=IssuedCurrency( | ||
currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" | ||
), | ||
limit=500, | ||
) | ||
) | ||
|
||
client.request( | ||
BookOffers( | ||
ledger_index="current", | ||
taker_gets=IssuedCurrency( | ||
currency="USD", issuer="rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq" | ||
), | ||
taker_pays=XRP(), | ||
limit=500, | ||
) | ||
) | ||
|
||
self.assertEqual(client._messages.qsize(), 0) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
"""A client for interacting with the rippled WebSocket API.""" | ||
|
||
from __future__ import annotations | ||
|
||
import asyncio | ||
|
@@ -135,9 +136,10 @@ async def _handler(self: WebsocketBase) -> None: | |
# if this response corresponds to request, fulfill the Future | ||
if "id" in response_dict and response_dict["id"] in self._open_requests: | ||
self._open_requests[response_dict["id"]].set_result(response_dict) | ||
|
||
# enqueue the response for the message queue | ||
cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict) | ||
# a response that fulfills a future is not enqueued again | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is more confusing than useful IMO. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed in 3a53ffc |
||
else: | ||
# otherwise, enqueue the response into the message queue | ||
cast(_MESSAGES_TYPE, self._messages).put_nowait(response_dict) | ||
|
||
def _set_up_future(self: WebsocketBase, request: Request) -> None: | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file should be called
test_websocket_client
and the description should be more general, as it may be used for other tests in the future.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed in 3a53ffc