From d65ca598755c21f68884a3679b7ceff59c2c6d0a Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 22 Nov 2024 15:20:09 +0200 Subject: [PATCH 1/3] Upgrade websockets --- .../bittensor/async_substrate_interface.py | 37 ++++++++++--------- requirements.txt | 3 +- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/bittensor_cli/src/bittensor/async_substrate_interface.py b/bittensor_cli/src/bittensor/async_substrate_interface.py index 82bfc9d0..2dfd43e5 100644 --- a/bittensor_cli/src/bittensor/async_substrate_interface.py +++ b/bittensor_cli/src/bittensor/async_substrate_interface.py @@ -4,12 +4,11 @@ from collections import defaultdict from dataclasses import dataclass from hashlib import blake2b -from typing import Optional, Any, Union, Callable, Awaitable, cast +from typing import Optional, Any, Union, Callable, Awaitable, cast, TYPE_CHECKING from async_property import async_property from bt_decode import PortableRegistry, decode as decode_by_type_string, MetadataV15 from bittensor_wallet import Keypair -from packaging import version from scalecodec import GenericExtrinsic from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject from scalecodec.type_registry import load_type_registry_preset @@ -20,10 +19,14 @@ BlockNotFound, ) from substrateinterface.storage import StorageKey -import websockets +from websockets.asyncio.client import connect +from websockets.exceptions import ConnectionClosed from bittensor_cli.src.bittensor.utils import hex_to_bytes +if TYPE_CHECKING: + from websockets.asyncio.client import ClientConnection + ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]] @@ -627,7 +630,7 @@ def __init__( # TODO allow setting max concurrent connections and rpc subscriptions per connection # TODO reconnection logic self.ws_url = ws_url - self.ws: Optional[websockets.WebSocketClientProtocol] = None + self.ws: Optional[ClientConnection] = None self.id = 0 self.max_subscriptions = max_subscriptions self.max_connections = max_connections @@ -655,7 +658,7 @@ async def __aenter__(self): async def _connect(self): self.ws = await asyncio.wait_for( - websockets.connect(self.ws_url, **self._options), timeout=10 + connect(self.ws_url, **self._options), timeout=10 ) async def __aexit__(self, exc_type, exc_val, exc_tb): @@ -698,9 +701,7 @@ async def shutdown(self): async def _recv(self) -> None: try: - response = json.loads( - await cast(websockets.WebSocketClientProtocol, self.ws).recv() - ) + response = json.loads(await cast(ClientConnection, self.ws).recv()) async with self._lock: self._open_subscriptions -= 1 if "id" in response: @@ -709,7 +710,7 @@ async def _recv(self) -> None: self._received[response["params"]["subscription"]] = response else: raise KeyError(response) - except websockets.ConnectionClosed: + except ConnectionClosed: raise except KeyError as e: raise e @@ -720,7 +721,7 @@ async def _start_receiving(self): await self._recv() except asyncio.CancelledError: pass - except websockets.ConnectionClosed: + except ConnectionClosed: # TODO try reconnect, but only if it's needed raise @@ -737,7 +738,7 @@ async def send(self, payload: dict) -> int: try: await self.ws.send(json.dumps({**payload, **{"id": original_id}})) return original_id - except websockets.ConnectionClosed: + except ConnectionClosed: raise async def retrieve(self, item_id: int) -> Optional[dict]: @@ -774,13 +775,13 @@ def __init__( """ self.chain_endpoint = chain_endpoint self.__chain = chain_name - options = { - "max_size": 2**32, - "write_limit": 2**16, - } - if version.parse(websockets.__version__) < version.parse("14.0"): - options.update({"read_limit": 2**16}) - self.ws = Websocket(chain_endpoint, options=options) + self.ws = Websocket( + chain_endpoint, + options={ + "max_size": 2**32, + "write_limit": 2**16, + }, + ) self._lock = asyncio.Lock() self.last_block_hash: Optional[str] = None self.config = { diff --git a/requirements.txt b/requirements.txt index bb50a3a7..268d40ce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,6 @@ fuzzywuzzy~=0.18.0 netaddr~=1.3.0 numpy>=2.0.1 Jinja2 -packaging pycryptodome # Crypto PyYAML~=6.0.1 pytest @@ -16,6 +15,6 @@ rich~=13.7 scalecodec==1.2.11 substrate-interface~=1.7.9 typer~=0.12 -websockets>=12.0 +websockets>=14.1 bittensor-wallet>=2.1.0 bt-decode==0.2.0a0 \ No newline at end of file From 4e5c3cb1bb3c95311bb0c5d51f6745208759813a Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 22 Nov 2024 16:13:15 +0200 Subject: [PATCH 2/3] Better-improved the asyncio websocket connection. --- .../src/bittensor/async_substrate_interface.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/bittensor_cli/src/bittensor/async_substrate_interface.py b/bittensor_cli/src/bittensor/async_substrate_interface.py index 2dfd43e5..869de573 100644 --- a/bittensor_cli/src/bittensor/async_substrate_interface.py +++ b/bittensor_cli/src/bittensor/async_substrate_interface.py @@ -652,15 +652,12 @@ async def __aenter__(self): self._exit_task.cancel() if not self._initialized: self._initialized = True - await self._connect() + self.ws = await asyncio.wait_for( + connect(self.ws_url, **self._options), timeout=10 + ) self._receiving_task = asyncio.create_task(self._start_receiving()) return self - async def _connect(self): - self.ws = await asyncio.wait_for( - connect(self.ws_url, **self._options), timeout=10 - ) - async def __aexit__(self, exc_type, exc_val, exc_tb): async with self._lock: self._in_use -= 1 @@ -701,7 +698,7 @@ async def shutdown(self): async def _recv(self) -> None: try: - response = json.loads(await cast(ClientConnection, self.ws).recv()) + response = json.loads(await self.ws.recv()) async with self._lock: self._open_subscriptions -= 1 if "id" in response: From c79a81600d2af3233b3335218f49a93b8304dd05 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Fri, 22 Nov 2024 16:24:19 +0200 Subject: [PATCH 3/3] Synced with bittensor implementation. --- bittensor_cli/src/bittensor/async_substrate_interface.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bittensor_cli/src/bittensor/async_substrate_interface.py b/bittensor_cli/src/bittensor/async_substrate_interface.py index 869de573..d9ea3765 100644 --- a/bittensor_cli/src/bittensor/async_substrate_interface.py +++ b/bittensor_cli/src/bittensor/async_substrate_interface.py @@ -439,7 +439,7 @@ def add_item( self.block_hashes[block_hash] = runtime def retrieve( - self, block: Optional[int], block_hash: Optional[str] + self, block: Optional[int] = None, block_hash: Optional[str] = None ) -> Optional["Runtime"]: if block is not None: return self.blocks.get(block) @@ -630,7 +630,7 @@ def __init__( # TODO allow setting max concurrent connections and rpc subscriptions per connection # TODO reconnection logic self.ws_url = ws_url - self.ws: Optional[ClientConnection] = None + self.ws: Optional["ClientConnection"] = None self.id = 0 self.max_subscriptions = max_subscriptions self.max_connections = max_connections @@ -1135,7 +1135,7 @@ async def create_storage_key( ------- StorageKey """ - runtime = await self.init_runtime(block_hash=block_hash) + await self.init_runtime(block_hash=block_hash) return StorageKey.create_from_storage_function( pallet, @@ -1555,7 +1555,7 @@ async def _process_response( self, response: dict, subscription_id: Union[int, str], - value_scale_type: Optional[str], + value_scale_type: Optional[str] = None, storage_item: Optional[ScaleType] = None, runtime: Optional[Runtime] = None, result_handler: Optional[ResultHandler] = None,