diff --git a/dank_mids/_config.py b/dank_mids/_config.py index f3999a2c..9f7f1b09 100644 --- a/dank_mids/_config.py +++ b/dank_mids/_config.py @@ -17,10 +17,9 @@ # With default AsyncBaseProvider settings, some dense calls will fail # due to aiohttp.TimeoutError where they would otherwise succeed. -# This is not due to the response time from the node, but the event loop timing. -# We set the default to 5 minutes but if you're doing serious work +# We set the default to 2 minutes but if you're doing serious work # you may want to increase it further. -AIOHTTP_TIMEOUT = ClientTimeout(int(os.environ.get("AIOHTTP_TIMEOUT", 300))) +AIOHTTP_TIMEOUT = ClientTimeout(int(os.environ.get("AIOHTTP_TIMEOUT", 120))) # Method-specific Semaphores diff --git a/dank_mids/controller.py b/dank_mids/controller.py index a5e62b4f..aff24865 100644 --- a/dank_mids/controller.py +++ b/dank_mids/controller.py @@ -16,7 +16,7 @@ from dank_mids._config import LOOP_INTERVAL from dank_mids._demo_mode import demo_logger from dank_mids.loggers import main_logger, sort_lazy_logger -from dank_mids.requests import JSONRPCBatch, Multicall, RPCRequest, eth_call +from dank_mids.requests import RPCRequest, eth_call from dank_mids.types import BlockId, ChainId from dank_mids.uid import UIDGenerator from dank_mids.worker import DankWorker @@ -48,61 +48,53 @@ def __init__(self, w3: Web3) -> None: raise NotImplementedError("Dank Mids currently does not support this network.\nTo add support, you just need to submit a PR adding the appropriate multicall contract addresses to this file:\nhttps://github.com/banteg/multicall.py/blob/master/multicall/constants.py") self.multicall2 = to_checksum_address(multicall2) self.no_multicall = {self.multicall2} if multicall is None else {self.multicall2, to_checksum_address(multicall)} + self.pending_eth_calls: List[eth_call] = [] + self.pending_rpc_calls: List[RPCRequest] = [] + self.num_pending_eth_calls: int = 0 self.worker = DankWorker(self) - self.call_uid = UIDGenerator() - self.futs: List[asyncio.Future] = [] - self.pending_eth_calls: DefaultDict[BlockId, Multicall] = defaultdict(lambda: Multicall(self.worker)) - self.pools_closed_lock = threading.Lock() - self.pending_rpc_calls = JSONRPCBatch(self.worker) self.is_running: bool = False - self._first = None - self._paused = asyncio.Event() - self._paused._value = True + self.call_uid = UIDGenerator() self._checkpoint: float = time() self._instance: int = sum(len(_instances) for _instances in instances.values()) instances[self.chain_id].append(self) # type: ignore def __repr__(self) -> str: - return f"" + return f"" async def __call__(self, method: RPCEndpoint, params: Any) -> RPCResponse: return await (eth_call(self, params) if method == "eth_call" else RPCRequest(self, method, params)) # type: ignore [return-value] - @property - def endpoint(self) -> str: - return self.w3.provider.endpoint_uri # type: ignore - @property def batcher(self) -> NotSoBrightBatcher: return self.worker.batcher + @property + def pools_closed_lock(self) -> threading.Lock: + return self.call_uid.lock + async def taskmaster_loop(self) -> None: self.is_running = True - self._paused.clear() while self.pending_eth_calls or self.pending_rpc_calls: - await self.raise_exceptions() + await asyncio.sleep(0) if (self.loop_is_ready or self.queue_is_full): await self.execute_multicall() - await asyncio.sleep(0) - self._paused.set() self.is_running = False async def execute_multicall(self) -> None: - self.checkpoint = time() i = 0 while self.pools_closed_lock.locked(): if i // 500 == int(i // 500): main_logger.debug('lock is locked') i += 1 await asyncio.sleep(.1) - - # NOTE we put this here to prevent a double locking issue - empty = JSONRPCBatch(self.worker) with self.pools_closed_lock: - eth_calls = dict(self.pending_eth_calls) + eth_calls: DefaultDict[BlockId, List[eth_call]] = defaultdict(list) + for call in self.pending_eth_calls: + eth_calls[call.block].append(call) self.pending_eth_calls.clear() - rpc_calls = self.pending_rpc_calls - self.pending_rpc_calls = empty + self.num_pending_eth_calls = 0 + rpc_calls = self.pending_rpc_calls[:] + self.pending_rpc_calls.clear() demo_logger.info(f'executing multicall (current cid: {self.call_uid.latest})') # type: ignore await self.worker.execute_batch(eth_calls, rpc_calls) @@ -123,15 +115,8 @@ def loop_is_ready(self) -> bool: @property def queue_is_full(self) -> bool: - return bool(sum(len(v) for v in self.pending_eth_calls.values()) >= self.batcher.step * 25) + return bool(len(self.pending_eth_calls) >= self.batcher.step * 25) - async def raise_exceptions(self) -> None: - if futs := self.futs[:]: - for fut in futs: - if fut.done(): - await fut - self.futs.remove(fut) - def reduce_batch_size(self, num_calls: int) -> None: new_step = round(num_calls * 0.99) if num_calls >= 100 else num_calls - 1 # NOTE: We need this check because one of the other multicalls in a batch might have already reduced `self.batcher.step` diff --git a/dank_mids/helpers.py b/dank_mids/helpers.py index be720098..d2bd7c81 100644 --- a/dank_mids/helpers.py +++ b/dank_mids/helpers.py @@ -8,7 +8,6 @@ from eth_utils.toolz import assoc, complement, compose, merge from hexbytes import HexBytes from multicall.utils import get_async_w3 -from tqdm.asyncio import tqdm_asyncio from web3 import Web3 from web3._utils.rpc_abi import RPC from web3.providers.async_base import AsyncBaseProvider @@ -35,15 +34,8 @@ def setup_dank_w3_from_sync(sync_w3: Web3) -> Web3: assert not sync_w3.eth.is_async and isinstance(sync_w3.provider, BaseProvider) return setup_dank_w3(get_async_w3(sync_w3)) -async def await_all(futs: Iterable[Awaitable], verbose: bool = False) -> None: - # NOTE: 'verbose' is mainly for debugging but feel free to have fun - if verbose is True: - generator = tqdm_asyncio.as_completed(futs if isinstance(futs, list) else [*futs]) - elif verbose is False: - generator = asyncio.as_completed(futs if isinstance(futs, list) else [*futs]) - else: - raise NotImplementedError(verbose) - for fut in generator: +async def await_all(futs: Iterable[Awaitable]) -> None: + for fut in asyncio.as_completed([*futs]): await fut del fut diff --git a/dank_mids/requests.py b/dank_mids/requests.py index c6bc386b..39940f5f 100644 --- a/dank_mids/requests.py +++ b/dank_mids/requests.py @@ -1,15 +1,12 @@ import abc import asyncio -import threading from collections import defaultdict -from time import time from typing import (TYPE_CHECKING, Any, DefaultDict, Dict, Generator, Generic, Iterable, Iterator, List, Optional, Tuple, TypeVar, Union) import aiohttp import eth_retry -import requests from aiohttp import RequestInfo from eth_abi import decode_single, encode_single from eth_typing import ChecksumAddress @@ -27,7 +24,7 @@ from dank_mids.constants import BAD_HEXES, OVERRIDE_CODE from dank_mids.helpers import await_all from dank_mids.loggers import main_logger -from dank_mids.types import BatchId, BlockId, JsonrpcParams, RpcCallJson, T +from dank_mids.types import BatchId, BlockId, JsonrpcParams, RpcCallJson if TYPE_CHECKING: from dank_mids.controller import DankMiddlewareController @@ -58,16 +55,10 @@ def _reattempt_call_and_return_exception(target: ChecksumAddress, calldata: byte def _err_response(e: Exception) -> RPCError: """ Extract an error message from `e` to use in a spoof rpc response. """ if isinstance(e.args[0], str) or isinstance(e.args[0], RequestInfo): - if isinstance(e, AttributeError): - raise e err_msg = f"DankMidsError: {e.__class__.__name__}: {e.args}" - elif isinstance(e.args[0], AttributeError): - raise e.args[0] elif isinstance(e.args[0], Exception): err_msg = f"DankMidsError: {e.args[0].__class__.__name__}: {e.args[0].args}" elif not hasattr(e.args[0], '__contains__'): - if isinstance(e, AttributeError): - raise e err_msg = f"DankMidsError: {e.__class__.__name__}: {e.args}" elif "message" in e.args[0]: err_msg = e.args[0]["message"] @@ -86,8 +77,7 @@ def __init__(self) -> None: self.uid = self.controller.call_uid.next elif isinstance(self, _Batch): self.uid = self.worker.controller.call_uid.next - self._started = asyncio.Event() - self._response: asyncio.Queue[_Response] = asyncio.Queue() + self._response: Optional[_Response] = None def __await__(self) -> Generator[Any, None, Optional[_Response]]: return self.get_response().__await__() @@ -96,6 +86,16 @@ def __await__(self) -> Generator[Any, None, Optional[_Response]]: def __len__(self) -> int: pass + @property + def is_complete(self) -> bool: + return self._response is not None + + @property + def response(self) -> _Response: + if self._response is None: + raise ResponseNotReady(self) + return self._response + @abc.abstractmethod async def get_response(self) -> Optional[_Response]: pass @@ -103,22 +103,15 @@ async def get_response(self) -> Optional[_Response]: ### Single requests: -single_semaphore = asyncio.Semaphore(1) - -def wakeup(self: _RequestMeta): - # NOTE: We need to let the main thread's event loop know there are delivered contents as the base ueue implementation is not threadsafe. - self._response._loop._write_to_self() - class RPCRequest(_RequestMeta[RPCResponse]): def __init__(self, controller: "DankMiddlewareController", method: RPCEndpoint, params: Any): self.controller = controller - self.controller.checkpoint = time() self.method = method self.params = params super().__init__() if isinstance(self, eth_call) and self.multicall_compatible: - self.controller.pending_eth_calls[self.block].append(self) + self.controller.pending_eth_calls.append(self) else: self.controller.pending_rpc_calls.append(self) demo_logger.info(f'added to queue (cid: {self.uid})') # type: ignore @@ -142,38 +135,23 @@ def rpc_data(self) -> RpcCallJson: return {'jsonrpc': '2.0', 'id': self.uid, 'method': self.method, 'params': self.params} async def get_response(self) -> RPCResponse: - if not self._started.is_set(): - if self.controller._first is None: - self.controller._first = self - await asyncio.sleep(0) - if self.controller._first is self: - while not self.controller.loop_is_ready: - await asyncio.sleep(0) - await self.controller.execute_multicall() - self.controller._first = None - return await self.spoof_response(await self._response.get()) - - async def set_response(self, data: Union[str, AttributeDict, Exception], wakeup_main_loop: bool = True) -> None: - self._response.put_nowait(data) - if wakeup_main_loop: - wakeup(self) - - async def spoof_response(self, data: Union[bytes, str, AttributeDict, Exception]) -> RPCResponse: + if not self.controller.is_running: + await self.controller.taskmaster_loop() + while not self.is_complete: + await asyncio.sleep(0) + return self.response + + async def spoof_response(self, data: Union[str, AttributeDict, Exception]) -> None: spoof = {"id": self.uid, "jsonrpc": "dank_mids"} - if isinstance(data, bytes): - spoof["result"] = data.hex() # type: ignore - elif isinstance(data, (str, AttributeDict)): - spoof["result"] = data - elif isinstance(data, Exception): + if isinstance(data, Exception): spoof["error"] = _err_response(data) else: - raise NotImplementedError(data.__class__.__name__) + spoof["result"] = data # type: ignore if isinstance(self, eth_call): main_logger.debug(f"method: eth_call address: {self.target} spoof: {spoof}") else: main_logger.debug(f"method: {self.method} spoof: {spoof}") - return spoof - + self._response = spoof # type: ignore class eth_call(RPCRequest): def __init__(self, controller: "DankMiddlewareController", params: Any) -> None: @@ -196,19 +174,19 @@ def multicall_compatible(self) -> bool: def target(self) -> str: return self.params[0]["to"] - async def spoof_response(self, data: Union[bytes, Exception]) -> RPCResponse: # type: ignore + async def spoof_response(self, data: Union[bytes, Exception]) -> None: # type: ignore """ Sets and returns a spoof rpc response for this BatchedCall instance using data provided by the worker. """ # NOTE: If multicall failed, make sync call to get either: # - revert details # - successful response if _call_failed(data): data = await self.sync_call() - return await super().spoof_response(data) + await super().spoof_response(data.hex() if isinstance(data, bytes) else data) async def sync_call(self) -> Union[bytes, Exception]: """ Used to bypass DankMiddlewareController. """ - data = await self.controller.worker.run_in_executor( - _reattempt_call_and_return_exception, self.target, self.calldata, self.block, self.controller.sync_w3, loop=asyncio.get_event_loop() + data = await run_in_subprocess( + _reattempt_call_and_return_exception, self.target, self.calldata, self.block, self.controller.sync_w3 ) # If we were able to get a usable response from single call, add contract to `do_not_batch`. if not isinstance(data, Exception): @@ -227,9 +205,6 @@ def __init__(self, worker: "DankWorker", calls: Iterable[_Request]): self.worker: DankWorker = worker self.calls = list(calls) # type: ignore super().__init__() - self._len = len(self.calls) - self._len_lock = threading.Lock() - self._fut = None def __bool__(self) -> bool: return bool(self.calls) @@ -241,49 +216,15 @@ def __iter__(self) -> Iterator[_Request]: return iter(self.calls) def __len__(self) -> int: - with self._len_lock: - return self._len - - async def coroutine(self) -> None: - return await self + return len(self.calls) - def append(self, call: _Request, skip_check: bool = False) -> None: + def append(self, call: _Request) -> None: self.calls.append(call) - with self._len_lock: - self._len += 1 - if skip_check is False and self.is_full: - self.ensure_future() - - def extend(self, calls: Iterable[_Request], skip_check: bool = False) -> None: - self.calls.extend(calls) - with self._len_lock: - self._len += len(calls) - if skip_check is False and self.is_full: - self.ensure_future() @property def controller(self) -> "DankMiddlewareController": return self.worker.controller - def ensure_future(self) -> asyncio.Future: - if self._fut is not None: - return self._fut - with self.controller.pools_closed_lock: - self._ensure_future() - return self._fut - - def _ensure_future(self) -> asyncio.Future: - """Not threadsafe""" - for call in self.calls: - call._started.set() - if isinstance(call, Multicall): - for _call in call: - _call._started.set() - self._fut = asyncio.run_coroutine_threadsafe(self.coroutine(), loop=self.worker.event_loop) - self._fut.add_done_callback(self.raise_exception_in_main_thread) - self.controller.futs.append(self._fut) - self._post_future_cleanup() - @property def halfpoint(self) -> int: return len(self) // 2 @@ -301,11 +242,6 @@ def chunk0(self) -> List[_Request]: def chunk1(self) -> List[_Request]: return self.calls[self.halfpoint:] - def raise_exception_in_main_thread(self, fut: asyncio.Future) -> None: - """Callback used to raise any exceptions that occur in the worker thread in the main thread.""" - fut.result() - self.controller.futs.remove(fut) - def should_retry(self, e: Exception) -> bool: if "out of gas" in f"{e}": # TODO Remember which contracts/calls are gas guzzlers @@ -319,7 +255,7 @@ def should_retry(self, e: Exception) -> bool: class Multicall(_Batch[eth_call]): - """ Runs in worker thread. One-time use.""" + """ Runs in worker thread. """ method = "eth_call" fourbyte = function_signature_to_4byte_selector("tryBlockAndAggregate(bool,(address,bytes)[])") input_types = "(bool,(address,bytes)[])" @@ -353,18 +289,14 @@ def params(self) -> JsonrpcParams: @property def rpc_data(self) -> RpcCallJson: return {'jsonrpc': '2.0', 'id': self.uid, 'method': self.method, 'params': self.params} - - @property - def is_full(self) -> bool: - return len(self) >= self.controller.batcher.step - - async def get_response(self) -> None: + + async def get_response(self) -> List[RPCResponse]: rid = self.worker.request_uid.next demo_logger.info(f'request {rid} for multicall {self.bid} starting') # type: ignore try: - await self.set_response(await self.worker(*self.params)) + await self.spoof_response(await self.worker(*self.params)) except Exception as e: - await (self.bisect_and_retry() if self.should_retry(e) else self.set_response(e)) # type: ignore [misc] + await (self.bisect_and_retry() if self.should_retry(e) else self.spoof_response(e)) # type: ignore [misc] demo_logger.info(f'request {rid} for multicall {self.bid} complete') # type: ignore def should_retry(self, e: Exception) -> bool: @@ -379,39 +311,27 @@ def should_retry(self, e: Exception) -> bool: return True return len(self) > 1 - async def set_response(self, data: Union[bytes, str, Exception], wakeup_main_loop: bool = True) -> None: + async def spoof_response(self, data: Union[bytes, str, Exception]) -> None: """ If called from `self`, `response` will be bytes type. if called from a JSONRPCBatch, `response` will be str type. """ if isinstance(data, Exception): - await await_all(call.set_response(data, wakeup_main_loop=False) for call in self.calls) + await await_all(call.spoof_response(data) for call in self.calls) else: decoded: List[Tuple[bool, bytes]] _, _, decoded = await run_in_subprocess(decode_single, self.output_types, to_bytes(data)) - await await_all(call.set_response(data, wakeup_main_loop=False) for call, (_, data) in zip(self.calls, decoded)) - if wakeup_main_loop: - wakeup(self) + await await_all(call.spoof_response(data) for call, (_, data) in zip(self.calls, decoded)) async def bisect_and_retry(self) -> List[RPCResponse]: await await_all((Multicall(self.worker, chunk, f"{self.bid}_{i}") for i, chunk in enumerate(self.bisected))) - - def _post_future_cleanup(self) -> None: - self.controller.pending_eth_calls.pop(self.block) -def post_sync(endpoint, data) -> Union[bytes, Tuple[str, Exception]]: - response = requests.post(endpoint, json=data) - try: - return response.json() - except Exception as e: - return response._content.decode(), e - class JSONRPCBatch(_Batch[Union[Multicall, RPCRequest]]): def __init__(self, worker: "DankWorker", calls: List[Union[Multicall, RPCRequest]] = [], jid: Optional[BatchId] = None) -> None: super().__init__(worker, calls) self.jid = jid or self.worker.jsonrpc_batch_uid.next - self._fut: Optional[asyncio.Future] = None + self._locked = False @property def data(self) -> List[RpcCallJson]: @@ -435,59 +355,35 @@ def method_counts(self) -> Dict[RPCEndpoint, int]: @property def total_calls(self) -> int: return sum(len(call) for call in self.calls) - - @property - def is_full(self) -> bool: - return len(self) >= MAX_JSONRPC_BATCH_SIZE - - def extend(self, calls: "JSONRPCBatch", skip_check: bool=False) -> None: - if calls.is_single_multicall: - return self.append(calls, skip_check=skip_check) - super().extend(calls, skip_check=skip_check) + + def append(self, call: Union[Multicall, RPCRequest]) -> None: + if self._locked: + raise Exception(f"{self} is locked.") + self.calls.append(call) async def get_response(self) -> None: """ Runs in worker thread. """ + self._locked = True rid = self.worker.request_uid.next - if self.is_single_multicall: - return await self[0].get_response() - if DEMO_MODE: # When demo mode is disabled, we can save some CPU time by skipping this sum demo_logger.info(f'request {rid} for jsonrpc batch {self.jid} ({sum(len(batch) for batch in self.calls)} calls) starting') # type: ignore try: - # NOTE: We do this inline so we don't have to allocate the response to memory - await self.set_response(self.validate_responses(await self.post_sync())) + responses = await self.post() + self.validate_responses(responses) + await self.spoof_response(responses) except Exception as e: - await (self.bisect_and_retry() if self.should_retry(e) else self.set_response(e)) + await (self.bisect_and_retry() if self.should_retry(e) else self.spoof_response(e)) demo_logger.info(f'request {rid} for jsonrpc batch {self.jid} complete') # type: ignore - @eth_retry.auto_retry - async def post_sync(self) -> Union[Dict, List[bytes]]: - response = await self.worker.event_loop.run_in_executor(self.worker.executor, post_sync, self.controller.endpoint, self.data) - if isinstance(response, (List, dict)): - return response - elif isinstance(response, Tuple): - decoded, e = response - counts = self.method_counts - main_logger.info(f"json batch id: {self.jid} | len: {len(self)} | total calls: {self.total_calls}", ) - main_logger.info(f"methods called: {counts}") - if 'content length too large' in decoded or decoded == "": - if self.is_multicalls_only: - self.controller.reduce_batch_size(self.total_calls) - raise ValueError(decoded) - # This shouldn't run unless there are issues. I'll probably delete it later. - main_logger.info(f"decoded body: {decoded}") - raise e - raise NotImplementedError(response.__class__.__name__) - @eth_retry.auto_retry async def post(self) -> Union[Dict, List[bytes]]: """ Posts `jsonrpc_batch` to your node. A successful call returns a list. """ async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session: - responses = await session.post(self.controller.endpoint, json=self.data) # type: ignore + responses = await session.post(self.worker.endpoint, json=self.data) # type: ignore try: return await responses.json(content_type=responses.content_type) - except Exception as e: + except: counts = self.method_counts decoded = responses._body.decode() main_logger.info(f"json batch id: {self.jid} | len: {len(self)} | total calls: {self.total_calls}", ) @@ -498,7 +394,7 @@ async def post(self) -> Union[Dict, List[bytes]]: raise ValueError(decoded) # This shouldn't run unless there are issues. I'll probably delete it later. main_logger.info(f"decoded body: {decoded}") - main_logger.info(f"{e.__class__.__name__}: {e} {responses.content._exception}") + main_logger.info(f"exception: {responses.content._exception}") raise def should_retry(self, e: Exception) -> bool: @@ -514,26 +410,22 @@ def should_retry(self, e: Exception) -> bool: return True return self.is_single_multicall - async def set_response(self, response: Union[List[RPCResponse], Exception], wakeup_main_loop: bool = True) -> None: + async def spoof_response(self, response: Union[List[RPCResponse], Exception]) -> None: if isinstance(response, Exception): - await await_all(call.set_response(response, wakeup_main_loop=False) for call in self.calls) - else: - await await_all( - # NOTE: For some rpc methods, the result will be a dict we can't hash during the gather. - call.set_response(AttributeDict(result["result"], wakeup_main_loop=False) if isinstance(result["result"], dict) else result["result"]) # type: ignore - for call, result in zip(self.calls, response) - ) - if wakeup_main_loop: - wakeup(self) - - def validate_responses(self, responses: T) -> T: + return await await_all(call.spoof_response(response) for call in self.calls) + return await await_all( + # NOTE: For some rpc methods, the result will be a dict we can't hash during the gather. + call.spoof_response(AttributeDict(result["result"]) if isinstance(result["result"], dict) else result["result"]) # type: ignore + for call, result in zip(self.calls, response) + ) + + def validate_responses(self, responses) -> None: # A successful response will be a list if isinstance(responses, dict) and 'result' in responses and isinstance(responses['result'], dict) and 'message' in responses['result']: raise ValueError(responses['result']['message']) for response in responses: if 'result' not in response: raise ValueError(response) - return responses async def bisect_and_retry(self) -> None: await await_all( @@ -543,7 +435,3 @@ async def bisect_and_retry(self) -> None: for i, chunk in enumerate(self.bisected) if chunk ) - - def _post_future_cleanup(self) -> None: - self.controller.pending_rpc_calls = JSONRPCBatch(self.worker) - \ No newline at end of file diff --git a/dank_mids/types.py b/dank_mids/types.py index 3d63790c..0f9fc2d0 100644 --- a/dank_mids/types.py +++ b/dank_mids/types.py @@ -5,14 +5,12 @@ from web3.types import RPCEndpoint, RPCResponse if TYPE_CHECKING: - from dank_mids.requests import Multicall - -T = TypeVar("T") + from dank_mids.requests import eth_call ChainId = NewType("ChainId", int) BlockId = NewType("BlockId", str) BatchId = Union[int, str] -Multicalls = Dict[BlockId, "Multicall"] +CallsToExec = Dict[BlockId, List["eth_call"]] eth_callParams = TypedDict("eth_callParams", {"to": ChecksumAddress, "data": str}) OverrideParams = TypedDict("OverrideParams", {"code": str}) diff --git a/dank_mids/worker.py b/dank_mids/worker.py index a4dc3372..3532554f 100644 --- a/dank_mids/worker.py +++ b/dank_mids/worker.py @@ -5,27 +5,21 @@ import eth_retry from eth_typing import ChecksumAddress from multicall.multicall import NotSoBrightBatcher -from web3 import Web3 -from dank_mids import _config, executor +from dank_mids._config import GANACHE_FORK, MAX_JSONRPC_BATCH_SIZE from dank_mids.helpers import await_all -from dank_mids.requests import JSONRPCBatch, Multicall, _Batch -from dank_mids.types import Multicalls +from dank_mids.requests import JSONRPCBatch, Multicall, RPCRequest, _Batch +from dank_mids.types import CallsToExec from dank_mids.uid import UIDGenerator if TYPE_CHECKING: from dank_mids.controller import DankMiddlewareController -def call(endpoint: str, *request_args): - return Web3(endpoint).eth.call(*request_args) - - class DankWorker: """ Runs a second event loop in a subthread which is used to reduce congestion on the main event loop. - A second loop with less items running in parallel allows dank_mids to get your responses back to your - main loop faster. + This allows dank_mids to better communicate with your node while you abuse it with heavy loads. """ def __init__(self, controller: "DankMiddlewareController") -> None: self.controller = controller @@ -34,12 +28,11 @@ def __init__(self, controller: "DankMiddlewareController") -> None: self.multicall_uid: UIDGenerator = UIDGenerator() self.request_uid: UIDGenerator = UIDGenerator() self.jsonrpc_batch_uid: UIDGenerator = UIDGenerator() - self.state_override_not_supported: bool = _config.GANACHE_FORK or self.controller.chain_id == 100 # Gnosis Chain does not support state override. + self.state_override_not_supported: bool = GANACHE_FORK or self.controller.chain_id == 100 # Gnosis Chain does not support state override. self.event_loop = asyncio.new_event_loop() self.worker_thread = threading.Thread(target=self.start) self.worker_thread.start() - self.executor = executor.executor - + def start(self) -> None: """ Runs in worker thread. """ asyncio.set_event_loop(self.event_loop) @@ -52,39 +45,30 @@ async def loop(self) -> None: @eth_retry.auto_retry async def __call__(self, *request_args: Any) -> Any: - # NOTE: We make the actual reuest synchronously so we can get the results from the node without waiting for the event loop - return await self.run_in_executor(self.controller.sync_w3.eth.call, *request_args) # type: ignore - - async def run_in_executor(self, fn, *args, loop=None): #: Callable[P, T], *args: P.args) -> T: - return await (loop or self.event_loop).run_in_executor(self.executor, fn, *args) + return await self.controller.w3.eth.call(*request_args) # type: ignore + + @property + def endpoint(self) -> str: + return self.controller.w3.provider.endpoint_uri # type: ignore - async def execute_batch(self, calls_to_exec: Multicalls, rpc_calls: JSONRPCBatch) -> None: + async def execute_batch(self, calls_to_exec: CallsToExec, rpc_calls: List[RPCRequest]) -> None: """ Runs in main thread. """ - #asyncio.run_coroutine_threadsafe(self._execute_batch(calls_to_exec, rpc_calls), loop=self.event_loop).result() - self.controller.futs.append( - asyncio.run_coroutine_threadsafe( - #self._execute_batch(DankBatch(self, calls_to_exec, rpc_calls)), - - # NOTE we materialize the generator here so that the execution takes place in the main thread - await_all([*DankBatch(self, calls_to_exec, rpc_calls).coroutines]), - loop=self.event_loop - ) - ) + asyncio.run_coroutine_threadsafe(self._execute_batch(calls_to_exec, rpc_calls), self.event_loop).result() + + async def _execute_batch(self, calls_to_exec: CallsToExec, rpc_calls: List[RPCRequest]) -> None: + """ Runs in worker thread. """ + await DankBatch(self, calls_to_exec, rpc_calls) + class DankBatch: """ A batch of jsonrpc batches. """ - def __init__(self, worker: DankWorker, eth_calls: Multicalls, rpc_calls: JSONRPCBatch): + def __init__(self, worker: DankWorker, eth_calls: CallsToExec, rpc_calls: List[RPCRequest]): self.worker = worker - for mcall in eth_calls.values(): - for call in mcall: - call._started.set() - for call in rpc_calls: - call._started.set() self.eth_calls = eth_calls self.rpc_calls = rpc_calls - #def __await__(self) -> Generator[Any, None, Any]: - # return await_all(self.coroutines).__await__() + def __await__(self) -> Generator[Any, None, Any]: + return await_all(self.coroutines).__await__() @property def batcher(self) -> NotSoBrightBatcher: @@ -92,16 +76,26 @@ def batcher(self) -> NotSoBrightBatcher: @property def coroutines(self) -> Generator["_Batch", None, None]: - *full_batches, working_batch = self.batch_multicalls(list(self.eth_calls.values())) + multicalls_to_batch: List["Multicall"] = [] + for *full_batches, remaining_calls in (self.batcher.batch_calls(calls, self.batcher.step) for calls in self.eth_calls.values()): + yield from (Multicall(self.worker, batch) for batch in full_batches) + multicalls_to_batch.append(Multicall(self.worker, remaining_calls)) + # Combine multicalls into one or more jsonrpc batches + *full_batches, working_batch = self.batch_multicalls(multicalls_to_batch) - # Yield full batches then yield the rest + # Yield full batches then prepare the rest yield from full_batches - if len(working_batch) + len(self.rpc_calls) <= _config.MAX_JSONRPC_BATCH_SIZE: - working_batch.extend(self.rpc_calls, skip_check=True) - yield working_batch - else: - yield working_batch - yield self.rpc_calls + rpc_calls_to_batch = self.rpc_calls[:] + while rpc_calls_to_batch: + if len(working_batch) >= MAX_JSONRPC_BATCH_SIZE: + yield working_batch + working_batch = JSONRPCBatch(self.worker) + working_batch.append(rpc_calls_to_batch.pop()) + if working_batch: + if working_batch.is_single_multicall: + yield working_batch[0] # type: ignore [misc] + else: + yield working_batch def batch_multicalls(self, multicalls: List["Multicall"]) -> Generator["JSONRPCBatch", None, None]: """ Used to collect multicalls into batches without overwhelming the node with oversized calls. """ @@ -120,7 +114,7 @@ def batch_multicalls(self, multicalls: List["Multicall"]) -> Generator["JSONRPCB else: working_batch.append(mcall) eth_calls_in_batch += len(mcall) - if len(working_batch) >= _config.MAX_JSONRPC_BATCH_SIZE: + if len(working_batch) >= MAX_JSONRPC_BATCH_SIZE: # There are more than `MAX_JSONRPC_BATCH_SIZE` rpc calls packed into this batch, let's start a new one yield working_batch working_batch = JSONRPCBatch(self.worker) diff --git a/tests/test_dank_mids.py b/tests/test_dank_mids.py index d0a189bd..22c350fe 100644 --- a/tests/test_dank_mids.py +++ b/tests/test_dank_mids.py @@ -1,12 +1,11 @@ -from time import time from brownie import chain +from dank_mids import instances from multicall import Call from multicall.utils import await_awaitable, gather from web3._utils.rpc_abi import RPC -from dank_mids import _config, instances from tests.fixtures import dank_w3 CHAI = '0x06AF07097C9Eeb7fD685c692751D5C66dB49c215' @@ -22,17 +21,7 @@ def _get_controller(): def _get_worker(): return _get_controller().worker -def _configure_batch_sizes(): - """This is here so I can play around with diff params.""" - # NOTE we need to ensure a dank controller is created before we can modify batch size - await_awaitable(dank_w3.eth.get_block_number()) - _get_worker().batcher.step = 10_000 - _config.MAX_JSONRPC_BATCH_SIZE = 200 - -_configure_batch_sizes() - def test_dank_middleware(): - start = time() await_awaitable(gather(BIG_WORK)) cid = _get_controller().call_uid.latest mid = _get_worker().multicall_uid.latest @@ -40,7 +29,6 @@ def test_dank_middleware(): assert cid, "The DankMiddlewareController did not process any calls." assert mid, "The DankMiddlewareController did not process any batches." assert rid, "The DankMiddlewareController did not process any requests." - print(f'took {time() - start} seconds.') print(f"calls: {cid}") print(f"multicalls: {mid}") print(f"requests: {rid}") @@ -67,7 +55,7 @@ def test_next_bid(): assert _get_worker().multicall_uid.next + 1 == _get_worker().multicall_uid.next def test_other_methods(): - work = [dank_w3.eth.get_block_number() for i in range(1500)] + work = [dank_w3.eth.get_block_number() for i in range(50)] work.append(dank_w3.eth.get_block('0xe25822')) work.append(dank_w3.manager.coro_request(RPC.web3_clientVersion, [])) assert await_awaitable(gather(work))