Skip to content

Commit

Permalink
Revert "Reduce memory and blocking (#55)"
Browse files Browse the repository at this point in the history
This reverts commit aa0e1b0.
  • Loading branch information
BobTheBuidler committed May 10, 2023
1 parent 7ac885e commit 50fd4a2
Showing 7 changed files with 126 additions and 282 deletions.
5 changes: 2 additions & 3 deletions dank_mids/_config.py
Original file line number Diff line number Diff line change
@@ -17,10 +17,9 @@

# With default AsyncBaseProvider settings, some dense calls will fail
# due to aiohttp.TimeoutError where they would otherwise succeed.
# This is not due to the response time from the node, but the event loop timing.
# We set the default to 5 minutes but if you're doing serious work
# We set the default to 2 minutes but if you're doing serious work
# you may want to increase it further.
AIOHTTP_TIMEOUT = ClientTimeout(int(os.environ.get("AIOHTTP_TIMEOUT", 300)))
AIOHTTP_TIMEOUT = ClientTimeout(int(os.environ.get("AIOHTTP_TIMEOUT", 120)))


# Method-specific Semaphores
51 changes: 18 additions & 33 deletions dank_mids/controller.py
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
from dank_mids._config import LOOP_INTERVAL
from dank_mids._demo_mode import demo_logger
from dank_mids.loggers import main_logger, sort_lazy_logger
from dank_mids.requests import JSONRPCBatch, Multicall, RPCRequest, eth_call
from dank_mids.requests import RPCRequest, eth_call
from dank_mids.types import BlockId, ChainId
from dank_mids.uid import UIDGenerator
from dank_mids.worker import DankWorker
@@ -48,61 +48,53 @@ def __init__(self, w3: Web3) -> None:
raise NotImplementedError("Dank Mids currently does not support this network.\nTo add support, you just need to submit a PR adding the appropriate multicall contract addresses to this file:\nhttps://github.com/banteg/multicall.py/blob/master/multicall/constants.py")
self.multicall2 = to_checksum_address(multicall2)
self.no_multicall = {self.multicall2} if multicall is None else {self.multicall2, to_checksum_address(multicall)}
self.pending_eth_calls: List[eth_call] = []
self.pending_rpc_calls: List[RPCRequest] = []
self.num_pending_eth_calls: int = 0
self.worker = DankWorker(self)
self.call_uid = UIDGenerator()
self.futs: List[asyncio.Future] = []
self.pending_eth_calls: DefaultDict[BlockId, Multicall] = defaultdict(lambda: Multicall(self.worker))
self.pools_closed_lock = threading.Lock()
self.pending_rpc_calls = JSONRPCBatch(self.worker)
self.is_running: bool = False
self._first = None
self._paused = asyncio.Event()
self._paused._value = True
self.call_uid = UIDGenerator()
self._checkpoint: float = time()
self._instance: int = sum(len(_instances) for _instances in instances.values())
instances[self.chain_id].append(self) # type: ignore

def __repr__(self) -> str:
return f"<DankMiddlewareController instance={self._instance} chain={self.chain_id} endpoint={self.endpoint}>"
return f"<DankMiddlewareController instance={self._instance} chain={self.chain_id} endpoint={self.worker.endpoint}>"

async def __call__(self, method: RPCEndpoint, params: Any) -> RPCResponse:
return await (eth_call(self, params) if method == "eth_call" else RPCRequest(self, method, params)) # type: ignore [return-value]

@property
def endpoint(self) -> str:
return self.w3.provider.endpoint_uri # type: ignore

@property
def batcher(self) -> NotSoBrightBatcher:
return self.worker.batcher

@property
def pools_closed_lock(self) -> threading.Lock:
return self.call_uid.lock

async def taskmaster_loop(self) -> None:
self.is_running = True
self._paused.clear()
while self.pending_eth_calls or self.pending_rpc_calls:
await self.raise_exceptions()
await asyncio.sleep(0)
if (self.loop_is_ready or self.queue_is_full):
await self.execute_multicall()
await asyncio.sleep(0)
self._paused.set()
self.is_running = False

async def execute_multicall(self) -> None:
self.checkpoint = time()
i = 0
while self.pools_closed_lock.locked():
if i // 500 == int(i // 500):
main_logger.debug('lock is locked')
i += 1
await asyncio.sleep(.1)

# NOTE we put this here to prevent a double locking issue
empty = JSONRPCBatch(self.worker)
with self.pools_closed_lock:
eth_calls = dict(self.pending_eth_calls)
eth_calls: DefaultDict[BlockId, List[eth_call]] = defaultdict(list)
for call in self.pending_eth_calls:
eth_calls[call.block].append(call)
self.pending_eth_calls.clear()
rpc_calls = self.pending_rpc_calls
self.pending_rpc_calls = empty
self.num_pending_eth_calls = 0
rpc_calls = self.pending_rpc_calls[:]
self.pending_rpc_calls.clear()
demo_logger.info(f'executing multicall (current cid: {self.call_uid.latest})') # type: ignore
await self.worker.execute_batch(eth_calls, rpc_calls)

@@ -123,15 +115,8 @@ def loop_is_ready(self) -> bool:

@property
def queue_is_full(self) -> bool:
return bool(sum(len(v) for v in self.pending_eth_calls.values()) >= self.batcher.step * 25)
return bool(len(self.pending_eth_calls) >= self.batcher.step * 25)

async def raise_exceptions(self) -> None:
if futs := self.futs[:]:
for fut in futs:
if fut.done():
await fut
self.futs.remove(fut)

def reduce_batch_size(self, num_calls: int) -> None:
new_step = round(num_calls * 0.99) if num_calls >= 100 else num_calls - 1
# NOTE: We need this check because one of the other multicalls in a batch might have already reduced `self.batcher.step`
12 changes: 2 additions & 10 deletions dank_mids/helpers.py
Original file line number Diff line number Diff line change
@@ -8,7 +8,6 @@
from eth_utils.toolz import assoc, complement, compose, merge
from hexbytes import HexBytes
from multicall.utils import get_async_w3
from tqdm.asyncio import tqdm_asyncio
from web3 import Web3
from web3._utils.rpc_abi import RPC
from web3.providers.async_base import AsyncBaseProvider
@@ -35,15 +34,8 @@ def setup_dank_w3_from_sync(sync_w3: Web3) -> Web3:
assert not sync_w3.eth.is_async and isinstance(sync_w3.provider, BaseProvider)
return setup_dank_w3(get_async_w3(sync_w3))

async def await_all(futs: Iterable[Awaitable], verbose: bool = False) -> None:
# NOTE: 'verbose' is mainly for debugging but feel free to have fun
if verbose is True:
generator = tqdm_asyncio.as_completed(futs if isinstance(futs, list) else [*futs])
elif verbose is False:
generator = asyncio.as_completed(futs if isinstance(futs, list) else [*futs])
else:
raise NotImplementedError(verbose)
for fut in generator:
async def await_all(futs: Iterable[Awaitable]) -> None:
for fut in asyncio.as_completed([*futs]):
await fut
del fut

Loading

0 comments on commit 50fd4a2

Please sign in to comment.