diff --git a/.gitignore b/.gitignore index 7881af65..404f2291 100644 --- a/.gitignore +++ b/.gitignore @@ -119,6 +119,7 @@ version.py # Ape stuff .build/ +.silverback-sessions/ **/.DS_Store *.swp diff --git a/example.py b/example.py index 60d13456..23039a7b 100644 --- a/example.py +++ b/example.py @@ -6,9 +6,9 @@ from ape_tokens import tokens # type: ignore[import] from taskiq import Context, TaskiqDepends, TaskiqState -from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState +from silverback import AppState, CircuitBreaker, SilverbackApp -# Do this to initialize your app +# Do this first to initialize your app app = SilverbackApp() # NOTE: Don't do any networking until after initializing app @@ -17,53 +17,68 @@ @app.on_startup() -def app_startup(startup_state: SilverbackStartupState): - return {"message": "Starting...", "block_number": startup_state.last_block_seen} +def app_startup(startup_state: AppState): + # NOTE: This is called just as the app is put into "run" state, + # and handled by the first available worker + # raise Exception # NOTE: Any exception raised on startup aborts immediately + return {"block_number": startup_state.last_block_seen} + + +# Can handle some resource initialization for each worker, like LLMs or database connections +class MyDB: + def execute(self, query: str): + pass -# Can handle some initialization on startup, like models or network connections @app.on_worker_startup() -def worker_startup(state: TaskiqState): +def worker_startup(state: TaskiqState): # NOTE: You need the type hint here + # NOTE: Can put anything here, any python object works + state.db = MyDB() state.block_count = 0 - # state.db = MyDB() - return {"message": "Worker started."} + # raise Exception # NOTE: Any exception raised on worker startup aborts immediately # This is how we trigger off of new blocks @app.on_(chain.blocks) -# context must be a type annotated kwarg to be provided to the task +# NOTE: The type hint for block is `BlockAPI`, but we parse it using `EcosystemAPI` +# NOTE: If you need something from worker state, you have to use taskiq context def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): - context.state.block_count += 1 + context.state.db.execute(f"some query {block.number}") return len(block.transactions) # This is how we trigger off of events # Set new_block_timeout to adjust the expected block time. -@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25) -# NOTE: Typing isn't required +@app.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25) +# NOTE: Typing isn't required, it will still be an Ape `ContractLog` type def exec_event1(log): if log.log_index % 7 == 3: - # If you ever want the app to shutdown under some scenario, call this exception - raise CircuitBreaker("Oopsie!") + # If you raise any exception, Silverback will track the failure and keep running + # NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself + raise ValueError("I don't like the number 3.") + return {"amount": log.amount} @app.on_(YFI.Approval) # Any handler function can be async too async def exec_event2(log: ContractLog): - return log.amount - + if log.log_index % 7 == 6: + # If you ever want the app to immediately shutdown under some scenario, raise this exception + raise CircuitBreaker("Oopsie!") -# Just in case you need to release some resources or something -@app.on_worker_shutdown() -def worker_shutdown(state): - return { - "message": f"Worker stopped after handling {state.block_count} blocks.", - "block_count": state.block_count, - } + return log.amount # A final job to execute on Silverback shutdown @app.on_shutdown() -def app_shutdown(state): - return {"message": "Stopping..."} +def app_shutdown(): + # raise Exception # NOTE: Any exception raised on shutdown is ignored + return {"some_metric": 123} + + +# Just in case you need to release some resources or something inside each worker +@app.on_worker_shutdown() +def worker_shutdown(state: TaskiqState): # NOTE: You need the type hint here + state.db = None + # raise Exception # NOTE: Any exception raised on worker shutdown is ignored diff --git a/silverback/__init__.py b/silverback/__init__.py index dd26b077..43b3c961 100644 --- a/silverback/__init__.py +++ b/silverback/__init__.py @@ -1,10 +1,10 @@ from .application import SilverbackApp from .exceptions import CircuitBreaker, SilverbackException -from .types import SilverbackStartupState +from .state import AppState __all__ = [ + "AppState", "CircuitBreaker", "SilverbackApp", "SilverbackException", - "SilverbackStartupState", ] diff --git a/silverback/_cli.py b/silverback/_cli.py index 4a604690..f3a48f1d 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -34,6 +34,16 @@ def _runner_callback(ctx, param, val): raise ValueError(f"Failed to import runner '{val}'.") +def _recorder_callback(ctx, param, val): + if not val: + return None + + elif recorder := import_from_string(val): + return recorder() + + raise ValueError(f"Failed to import recorder '{val}'.") + + def _account_callback(ctx, param, val): if val: val = val.alias.replace("dev_", "TEST::") @@ -92,11 +102,16 @@ async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90): help="An import str in format ':'", callback=_runner_callback, ) +@click.option( + "--recorder", + help="An import string in format ':'", + callback=_recorder_callback, +) @click.option("-x", "--max-exceptions", type=int, default=3) @click.argument("path") -def run(cli_ctx, account, runner, max_exceptions, path): +def run(cli_ctx, account, runner, recorder, max_exceptions, path): app = import_from_string(path) - runner = runner(app, max_exceptions=max_exceptions) + runner = runner(app, recorder=recorder, max_exceptions=max_exceptions) asyncio.run(runner.run()) diff --git a/silverback/application.py b/silverback/application.py index d1373cb2..81167ff3 100644 --- a/silverback/application.py +++ b/silverback/application.py @@ -13,7 +13,7 @@ from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError from .settings import Settings -from .types import TaskType +from .types import SilverbackID, TaskType @dataclass @@ -46,9 +46,15 @@ def __init__(self, settings: Settings | None = None): if not settings: settings = Settings() - self.network = settings.get_provider_context() + provider_context = settings.get_provider_context() # NOTE: This allows using connected ape methods e.g. `Contract` - provider = self.network.__enter__() + provider = provider_context.__enter__() + + self.identifier = SilverbackID( + name=settings.APP_NAME, + network=provider.network.name, + ecosystem=provider.network.ecosystem.name, + ) # Adjust defaults from connection if settings.NEW_BLOCK_TIMEOUT is None and ( @@ -64,20 +70,21 @@ def __init__(self, settings: Settings | None = None): self.tasks: defaultdict[TaskType, list[TaskData]] = defaultdict(list) self.poll_settings: dict[str, dict] = {} - atexit.register(self.network.__exit__, None, None, None) + atexit.register(provider_context.__exit__, None, None, None) self.signer = settings.get_signer() self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT self.start_block = settings.START_BLOCK - network_str = f'\n NETWORK="{provider.network.ecosystem.name}:{provider.network.name}"' signer_str = f"\n SIGNER={repr(self.signer)}" start_block_str = f"\n START_BLOCK={self.start_block}" if self.start_block else "" new_block_timeout_str = ( f"\n NEW_BLOCK_TIMEOUT={self.new_block_timeout}" if self.new_block_timeout else "" ) - logger.info( - f"Loaded Silverback App:{network_str}" + + network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}" + logger.success( + f'Loaded Silverback App:\n NETWORK="{network_choice}"' f"{signer_str}{start_block_str}{new_block_timeout_str}" ) @@ -120,11 +127,14 @@ def broker_task_decorator( def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: labels = {"task_type": str(task_type)} - if container and isinstance(container, ContractEvent): + # NOTE: Do *not* do `if container` because that does a `len(container)` call, + # which for ContractEvent queries *every single log* ever emitted, and really + # we only want to determine if it is not None + if container is not None and isinstance(container, ContractEvent): # Address is almost a certainty if the container is being used as a filter here. if contract_address := getattr(container.contract, "address", None): labels["contract_address"] = contract_address - labels["event_signature"] = container.abi.signature + labels["event_signature"] = f"{container.abi.signature}" broker_task = self.broker.register_task( handler, diff --git a/silverback/exceptions.py b/silverback/exceptions.py index 125e85a0..7bde82b0 100644 --- a/silverback/exceptions.py +++ b/silverback/exceptions.py @@ -1,7 +1,6 @@ -from typing import Any +from typing import Any, Sequence from ape.exceptions import ApeException -from ape.logging import logger from .types import TaskType @@ -31,14 +30,27 @@ class SilverbackException(ApeException): """Base Exception for any Silverback runtime faults.""" +# TODO: `ExceptionGroup` added in Python 3.11 +class StartupFailure(SilverbackException): + def __init__(self, *exceptions: Sequence[Exception]): + if error_str := "\n".join(str(e) for e in exceptions): + super().__init__(f"Startup failure(s):\n{error_str}") + else: + super().__init__("Startup failure(s) detected. See logs for details.") + + +class NoTasksAvailableError(SilverbackException): + def __init__(self): + super().__init__("No tasks to execute") + + class Halt(SilverbackException): def __init__(self): super().__init__("App halted, must restart manually") -class CircuitBreaker(SilverbackException): +class CircuitBreaker(Halt): """Custom exception (created by user) that will trigger an application shutdown.""" def __init__(self, message: str): - logger.error(message) - super().__init__(message) + super(SilverbackException, self).__init__(message) diff --git a/silverback/middlewares.py b/silverback/middlewares.py index 3f50ecd5..7db17ab5 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -6,8 +6,7 @@ from eth_utils.conversions import to_hex from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult -from silverback.recorder import HandlerResult -from silverback.types import SilverbackID, TaskType +from silverback.types import TaskType from silverback.utils import hexbytes_dict @@ -22,11 +21,7 @@ def compute_block_time() -> int: return int((head.timestamp - genesis.timestamp) / head.number) - settings = kwargs.pop("silverback_settings") - self.block_time = self.chain_manager.provider.network.block_time or compute_block_time() - self.ident = SilverbackID.from_settings(settings) - self.recorder = settings.get_recorder() def pre_send(self, message: TaskiqMessage) -> TaskiqMessage: # TODO: Necessary because bytes/HexBytes doesn't encode/deocde well for some reason @@ -49,20 +44,28 @@ def fix_dict(data: dict, recurse_count: int = 0) -> dict: return message def _create_label(self, message: TaskiqMessage) -> str: - if labels_str := ",".join(f"{k}={v}" for k, v in message.labels.items()): + if labels_str := ",".join( + # NOTE: Have to add extra quotes around event signatures so they display as a string + f"{k}={v}" if k != "event_signature" else f'{k}="{v}"' + for k, v in message.labels.items() + if k != "task_name" + ): return f"{message.task_name}[{labels_str}]" else: return message.task_name def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage: + # NOTE: Ensure we always have this, no matter what + message.labels["task_name"] = message.task_name + if "task_type" not in message.labels: return message # Not a silverback task - task_type = message.labels.pop("task_type") + task_type_str = message.labels.pop("task_type") try: - task_type = TaskType(task_type) + task_type = TaskType(task_type_str) except ValueError: return message # Not a silverback task @@ -97,21 +100,4 @@ def post_execute(self, message: TaskiqMessage, result: TaskiqResult): f"{self._create_label(message)} " f"- {result.execution_time:.3f}s{percent_display}" ) - async def post_save(self, message: TaskiqMessage, result: TaskiqResult): - if not self.recorder: - return - - handler_result = HandlerResult.from_taskiq( - self.ident, - message.task_name, - message.labels.get("block_number"), - message.labels.get("log_index"), - result, - ) - - try: - await self.recorder.add_result(handler_result) - except Exception as err: - logger.error(f"Error storing result: {err}") - # NOTE: Unless stdout is ignored, error traceback appears in stdout, no need for `on_error` diff --git a/silverback/recorder.py b/silverback/recorder.py index 6ce0a2ce..0b8d2018 100644 --- a/silverback/recorder.py +++ b/silverback/recorder.py @@ -1,327 +1,181 @@ -import json -import os -import sqlite3 from abc import ABC, abstractmethod -from datetime import datetime, timezone -from typing import TypeVar +from pathlib import Path +from typing import Any, Iterator -from pydantic import BaseModel +from ape.logging import get_logger +from pydantic import BaseModel, Field from taskiq import TaskiqResult from typing_extensions import Self # Introduced 3.11 -from .types import SilverbackID +from .types import ( + Datapoint, + ScalarDatapoint, + ScalarType, + SilverbackID, + UTCTimestamp, + iso_format, + utc_now, +) -_HandlerReturnType = TypeVar("_HandlerReturnType") +logger = get_logger(__name__) -class SilverbackState(BaseModel): - instance: str - network: str - # Last block number seen by runner - last_block_seen: int - # Last block number processed by a worker - last_block_processed: int - updated: datetime +class TaskResult(BaseModel): + # NOTE: Model must eventually serialize using PyArrow/Parquet for long-term storage + # Task Info + task_name: str + execution_time: float + error: str | None = None -class HandlerResult(TaskiqResult): - instance: str - network: str - handler_id: str - block_number: int | None - log_index: int | None - created: datetime + # NOTE: intended to use default when creating a model with this type + completed: UTCTimestamp = Field(default_factory=utc_now) + + # System Metrics here (must default to None in case they are missing) + block_number: int | None = None + + # Custom user metrics here + metrics: dict[str, Datapoint] = {} + + @classmethod + def _extract_custom_metrics(cls, result: Any, task_name: str) -> dict[str, Datapoint]: + if isinstance(result, Datapoint): # type: ignore[arg-type,misc] + return {"result": result} + + elif isinstance(result, ScalarType): # type: ignore[arg-type,misc] + return {"result": ScalarDatapoint(data=result)} + + elif result is None: + return {} + + elif not isinstance(result, dict): + logger.warning(f"Cannot handle return type of '{task_name}': '{type(result)}'.") + return {} + + converted_result = {} + + for metric_name, metric_value in result.items(): + if isinstance(metric_value, Datapoint): # type: ignore[arg-type,misc] + converted_result[metric_name] = metric_value + + elif isinstance(metric_value, ScalarType): # type: ignore[arg-type,misc] + converted_result[metric_name] = ScalarDatapoint(data=metric_value) + + else: + logger.warning( + f"Cannot handle type of metric '{task_name}.{metric_name}':" + f" '{type(metric_value)}'." + ) + + return converted_result + + @classmethod + def _extract_system_metrics(cls, labels: dict) -> dict: + metrics = {} + + if block_number := labels.get("block_number"): + metrics["block_number"] = int(block_number) + + return metrics @classmethod def from_taskiq( cls, - ident: SilverbackID, - handler_id: str, - block_number: int | None, - log_index: int | None, result: TaskiqResult, ) -> Self: + task_name = result.labels.pop("task_name", "") return cls( - instance=ident.identifier, - network=ident.network_choice, - handler_id=handler_id, - block_number=block_number, - log_index=log_index, - created=datetime.now(timezone.utc), - **result.dict(), + task_name=task_name, + execution_time=result.execution_time, + error=str(result.error) if result.error else None, + metrics=cls._extract_custom_metrics(result.return_value, task_name), + **cls._extract_system_metrics(result.labels), ) class BaseRecorder(ABC): - @abstractmethod - async def init(self): - """Handle any async initialization from Silverback settings (e.g. migrations).""" - ... + """ + Base class used for serializing task results to an external data recording process. - @abstractmethod - async def get_state(self, ident: SilverbackID) -> SilverbackState | None: - """Return the stored state for a Silverback instance""" - ... + Recorders are configured using the following environment variable: - @abstractmethod - async def set_state( - self, ident: SilverbackID, last_block_seen: int, last_block_processed: int - ) -> SilverbackState | None: - """Set the stored state for a Silverback instance""" - ... + - `SILVERBACK_RECORDER_CLASS`: Any fully qualified subclass of `BaseRecorder` as a string + """ @abstractmethod - async def get_latest_result( - self, ident: SilverbackID, handler: str | None = None - ) -> HandlerResult | None: - """Return the latest result for a Silverback instance's handler""" - ... + async def init(self, app_id: SilverbackID): + """ + Handle any async initialization from Silverback settings (e.g. migrations). + """ @abstractmethod - async def add_result(self, v: HandlerResult): + async def add_result(self, result: TaskResult): """Store a result for a Silverback instance's handler""" - ... -class SQLiteRecorder(BaseRecorder): +class JSONLineRecorder(BaseRecorder): """ - SQLite implementation of BaseRecorder used to store application state and handler - result data. - - Usage: - - To use SQLite recorder, you must configure the following env vars: + Very basic implementation of BaseRecorder used to handle results by appending to a file + containing newline-separated JSON entries (https://jsonlines.org/). - - `RECORDER_CLASS`: `silverback.recorder.SQLiteRecorder` - - `SQLITE_PATH` (optional): A system file path or if blank it will be stored in-memory. - """ + The file structure that this Recorder uses leverages the value of `SILVERBACK_APP_NAME` + as well as the configured network to determine the location where files get saved: - SQL_GET_STATE = """ - SELECT last_block_seen, last_block_processed, updated - FROM silverback_state - WHERE instance = ? AND network = ?; - """ - SQL_INSERT_STATE = """ - INSERT INTO silverback_state ( - instance, network, last_block_seen, last_block_processed, updated - ) - VALUES (?, ?, ?, ?, ?); - """ - SQL_UPDATE_STATE = """ - UPDATE silverback_state - SET last_block_seen = ?, last_block_processed = ?, updated = ? - WHERE instance = ? AND network = ?; - """ - SQL_GET_RESULT_LATEST = """ - SELECT handler_id, block_number, log_index, execution_time, is_err, created, - return_value_blob - FROM silverback_result - WHERE instance = ? AND network = ? - ORDER BY created DESC - LIMIT 1; - """ - SQL_GET_HANDLER_LATEST = """ - SELECT handler_id, block_number, log_index, execution_time, is_err, created, - return_value_blob - FROM silverback_result - WHERE instance = ? AND network = ? AND handler_id = ? - ORDER BY created DESC - LIMIT 1; - """ - SQL_INSERT_RESULT = """ - INSERT INTO silverback_result ( - instance, network, handler_id, block_number, log_index, execution_time, - is_err, created, return_value_blob - ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); - """ + ./.silverback-sessions/ + / + / + session-.json # start time of each app session - con: sqlite3.Connection | None - initialized: bool = False - - async def init(self): - self.con = sqlite3.connect(os.environ.get("SQLITE_PATH", ":memory:")) - - cur = self.con.cursor() - cur.executescript( - """ - BEGIN; - CREATE TABLE IF NOT EXISTS silverback_state ( - instance text, - network text, - last_block_seen int, - last_block_processed int, - updated int - ); - CREATE TABLE IF NOT EXISTS silverback_result ( - instance text, - network text, - handler_id text, - block_number int, - log_index int, - execution_time real, - is_err bool, - created int, - return_value_blob blob - ); - CREATE UNIQUE INDEX IF NOT EXISTS silverback_state__instance - ON silverback_state(instance, network); - CREATE INDEX IF NOT EXISTS silverback_result__instance - ON silverback_result (instance, network); - CREATE INDEX IF NOT EXISTS silverback_result__handler - ON silverback_result (instance, network, handler_id); - CREATE INDEX IF NOT EXISTS silverback_result__is_err - ON silverback_result (is_err); - COMMIT; - """ - ) - cur.close() + Each app "session" (everytime the Runner is started up via `silverback run`) is recorded + in a separate file with the timestamp of the first handled task in its filename. - if not self.con: - raise Exception("Failed to setup SQLite connection") + Note that this format can be read by basic means (even in a JS frontend), or read + efficiently via Apache Arrow for more efficient big data processing: - self.initialized = True + https://arrow.apache.org/docs/python/json.html - async def get_state(self, ident: SilverbackID) -> SilverbackState | None: - if not self.initialized: - await self.init() + Usage: - assert self.con is not None + To use this recorder, you must configure the following environment variable: - cur = self.con.cursor() - res = cur.execute( - self.SQL_GET_STATE, - (ident.identifier, ident.network_choice), - ) - row = res.fetchone() + - `SILVERBACK_RECORDER_CLASS`: `"silverback.recorder:JSONLineRecorder"` - cur.close() + You may also want to give your app a unique name so the data does not get overwritten, + if you are using multiple apps from the same directory: - if row is None: - return None + - `SILVERBACK_APP_NAME`: Any alphabetical string valid as a folder name + """ - return SilverbackState( - instance=ident.identifier, - network=ident.network_choice, - last_block_seen=row[0], - last_block_processed=row[1], - updated=datetime.fromtimestamp(row[2], timezone.utc), + async def init(self, app_id: SilverbackID): + data_folder = ( + Path.cwd() / ".silverback-sessions" / app_id.name / app_id.ecosystem / app_id.network ) + data_folder.mkdir(parents=True, exist_ok=True) - async def set_state( - self, ident: SilverbackID, last_block_seen: int, last_block_processed: int - ) -> SilverbackState | None: - if not self.initialized: - await self.init() - - assert self.con is not None - - cur = self.con.cursor() - res = cur.execute( - self.SQL_GET_STATE, - (ident.identifier, ident.network_choice), - ) - row = res.fetchone() - - now = datetime.now(timezone.utc) - now_stamp = int(now.timestamp()) - - if row is None: - cur.execute( - self.SQL_INSERT_STATE, - ( - ident.identifier, - ident.network_choice, - last_block_seen, - last_block_processed, - now_stamp, - ), - ) - else: - cur.execute( - self.SQL_UPDATE_STATE, - ( - last_block_seen, - last_block_processed, - now_stamp, - ident.identifier, - ident.network_choice, - ), - ) - - cur.close() - self.con.commit() - - return SilverbackState( - instance=ident.identifier, - network=ident.network_choice, - last_block_seen=last_block_seen, - last_block_processed=last_block_processed, - updated=now, - ) + self.session_results_file = data_folder / f"session-{iso_format(utc_now())}.jsonl" - async def get_latest_result( - self, ident: SilverbackID, handler: str | None = None - ) -> HandlerResult | None: - if not self.initialized: - await self.init() - - assert self.con is not None - - cur = self.con.cursor() - - if handler is not None: - res = cur.execute( - self.SQL_GET_HANDLER_LATEST, - (ident.identifier, ident.network_choice, handler), - ) - else: - res = cur.execute( - self.SQL_GET_RESULT_LATEST, - (ident.identifier, ident.network_choice), - ) - - row = res.fetchone() - - cur.close() - - if row is None: - return None - - return HandlerResult( - instance=ident.identifier, - network=ident.network_choice, - handler_id=row[0], - block_number=row[1], - log_index=row[2], - execution_time=row[3], - is_err=row[4], - created=datetime.fromtimestamp(row[5], timezone.utc), - return_value=json.loads(row[6]), - ) + async def add_result(self, result: TaskResult): + # NOTE: mode `a` means "append to file if exists" + # NOTE: JSONNL convention requires the use of `\n` as newline char + with self.session_results_file.open("a") as writer: + writer.write(result.model_dump_json()) + writer.write("\n") - async def add_result(self, v: HandlerResult): - if not self.initialized: - await self.init() - - assert self.con is not None - - cur = self.con.cursor() - - cur.execute( - self.SQL_INSERT_RESULT, - ( - v.instance, - v.network, - v.handler_id, - v.block_number, - v.log_index, - v.execution_time, - v.is_err, - v.created, - json.dumps(v.return_value), - ), - ) - cur.close() - self.con.commit() +def get_metrics(session: Path, task_name: str) -> Iterator[dict]: + """ + Useful function for fetching results and loading them for display. + """ + with open(session, "r") as file: + for line in file: + if ( + (result := TaskResult.model_validate_json(line)) + and result.task_name == task_name + and not result.error + ): + yield { + "block_number": result.block_number, + "execution_time": result.execution_time, + "completed": result.completed, + **{name: datapoint.data for name, datapoint in result.metrics.items()}, + } diff --git a/silverback/runner.py b/silverback/runner.py index ed67f86e..d571a574 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -6,66 +6,78 @@ from ape.logging import logger from ape.utils import ManagerAccessMixin from ape_ethereum.ecosystem import keccak -from taskiq import AsyncTaskiqDecoratedTask, TaskiqResult +from taskiq import AsyncTaskiqDecoratedTask, AsyncTaskiqTask from .application import SilverbackApp -from .exceptions import Halt, NoWebsocketAvailableError -from .recorder import BaseRecorder -from .settings import Settings +from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure +from .recorder import BaseRecorder, TaskResult +from .state import AppDatastore, AppState from .subscriptions import SubscriptionType, Web3SubscriptionsManager -from .types import SilverbackID, SilverbackStartupState, TaskType +from .types import TaskType from .utils import async_wrap_iter, hexbytes_dict -settings = Settings() - class BaseRunner(ABC): - def __init__(self, app: SilverbackApp, *args, max_exceptions: int = 3, **kwargs): + def __init__( + self, + app: SilverbackApp, + *args, + max_exceptions: int = 3, + recorder: BaseRecorder | None = None, + **kwargs, + ): self.app = app + self.recorder = recorder + self.state = None + self.datastore = AppDatastore() self.max_exceptions = max_exceptions self.exceptions = 0 - self.last_block_seen = 0 - self.last_block_processed = 0 - self.recorder: BaseRecorder | None = None - self.ident = SilverbackID.from_settings(settings) - def _handle_result(self, result: TaskiqResult): - if result.is_err: - self.exceptions += 1 + logger.info(f"Using {self.__class__.__name__}: max_exceptions={self.max_exceptions}") - else: + async def _handle_task(self, task: AsyncTaskiqTask): + result = await task.wait_result() + + if self.recorder: + await self.recorder.add_result(TaskResult.from_taskiq(result)) + + if not result.is_err: + # NOTE: Reset exception counter self.exceptions = 0 + return + + self.exceptions += 1 - if self.exceptions > self.max_exceptions: - raise Halt() + if self.exceptions > self.max_exceptions or isinstance(result.error, Halt): + result.raise_for_error() async def _checkpoint( - self, last_block_seen: int = 0, last_block_processed: int = 0 - ) -> tuple[int, int]: + self, + last_block_seen: int | None = None, + last_block_processed: int | None = None, + ): """Set latest checkpoint block number""" - if ( - last_block_seen > self.last_block_seen - or last_block_processed > self.last_block_processed - ): - logger.debug( - ( - f"Checkpoint block [seen={self.last_block_seen}, " - f"procssed={self.last_block_processed}]" - ) + assert self.state, f"{self.__class__.__name__}.run() not triggered." + + logger.debug( + ( + f"Checkpoint block [seen={self.state.last_block_seen}, " + f"procssed={self.state.last_block_processed}]" ) - self.last_block_seen = max(last_block_seen, self.last_block_seen) - self.last_block_processed = max(last_block_processed, self.last_block_processed) + ) - if self.recorder: - try: - await self.recorder.set_state( - self.ident, self.last_block_seen, self.last_block_processed - ) - except Exception as err: - logger.error(f"Error settings state: {err}") + if last_block_seen: + self.state.last_block_seen = last_block_seen + if last_block_processed: + self.state.last_block_processed = last_block_processed + + if self.recorder: + try: + await self.datastore.set_state(self.state) - return self.last_block_seen, self.last_block_processed + except Exception as err: + logger.error(f"Error setting state: {err}") @abstractmethod async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): @@ -89,48 +101,86 @@ async def run(self): and process them by kicking events over to the configured broker. Raises: - :class:`~silverback.exceptions.Halt`: If there are no configured tasks to execute. + :class:`~silverback.exceptions.StartupFailure`: + If there was an exception during startup. + :class:`~silverback.exceptions.NoTasksAvailableError`: + If there are no configured tasks to execute. """ - self.recorder = settings.get_recorder() - + # Initialize recorder (if available) and fetch state if app has been run previously if self.recorder: - boot_state = await self.recorder.get_state(self.ident) - if boot_state: - self.last_block_seen = boot_state.last_block_seen - self.last_block_processed = boot_state.last_block_processed + await self.recorder.init(app_id=self.app.identifier) + + if startup_state := (await self.datastore.init(app_id=self.app.identifier)): + self.state = startup_state + else: # use empty state + self.state = AppState(last_block_seen=-1, last_block_processed=-1) + + # Initialize broker (run worker startup events) await self.app.broker.startup() # Execute Silverback startup task before we init the rest - for startup_task in self.app.tasks[TaskType.STARTUP]: - task = await startup_task.handler.kiq( - SilverbackStartupState( - last_block_seen=self.last_block_seen, - last_block_processed=self.last_block_processed, - ) + if startup_tasks := await asyncio.gather( + *(task_def.handler.kiq(self.state) for task_def in self.app.tasks[TaskType.STARTUP]) + ): + results = await asyncio.gather( + *(startup_task.wait_result() for startup_task in startup_tasks) ) - result = await task.wait_result() - self._handle_result(result) - tasks = [] - for task in self.app.tasks[TaskType.NEW_BLOCKS]: - tasks.append(self._block_task(task.handler)) + if any(result.is_err for result in results): + # NOTE: Abort before even starting to run + raise StartupFailure(*(result.error for result in results if result.is_err)) + + elif self.recorder: + converted_results = map(TaskResult.from_taskiq, results) + await asyncio.gather(*(self.recorder.add_result(r) for r in converted_results)) + + # NOTE: No need to handle results otherwise + + # Create our long-running event listeners + # NOTE: Any propagated failure in here should be handled such that shutdown tasks also run + # TODO: `asyncio.TaskGroup` added in Python 3.11 + listener_tasks = ( + *( + asyncio.create_task(self._block_task(task_def.handler)) + for task_def in self.app.tasks[TaskType.NEW_BLOCKS] + ), + *( + asyncio.create_task(self._event_task(task_def.container, task_def.handler)) + for task_def in self.app.tasks[TaskType.EVENT_LOG] + ), + ) - for task in self.app.tasks[TaskType.EVENT_LOG]: - tasks.append(self._event_task(task.container, task.handler)) + # NOTE: Safe to do this because no tasks have been scheduled to run yet + if len(listener_tasks) == 0: + raise NoTasksAvailableError() - if len(tasks) == 0: - raise Halt("No tasks to execute") + # Run until one task bubbles up an exception that should stop execution + tasks_with_errors, tasks_running = await asyncio.wait( + listener_tasks, return_when=asyncio.FIRST_EXCEPTION + ) + if runtime_errors := "\n".join(str(task.exception()) for task in tasks_with_errors): + # NOTE: In case we are somehow not displaying the error correctly with task status + logger.debug(f"Runtime error(s) detected, shutting down:\n{runtime_errors}") - try: - await asyncio.gather(*tasks) - except Exception as e: - logger.error(f"Fatal error detected, shutting down: '{e}'") + # Cancel any still running + (task.cancel() for task in tasks_running) + # NOTE: All listener tasks are shut down now + + # Execute Silverback shutdown task(s) before shutting down the broker and app + if shutdown_tasks := await asyncio.gather( + *(task_def.handler.kiq() for task_def in self.app.tasks[TaskType.SHUTDOWN]) + ): + asyncio.gather(*(shutdown_task.is_ready() for shutdown_task in shutdown_tasks)) + if any(result.is_err for result in results): + errors_str = "\n".join(str(result.error) for result in results if result.is_err) + logger.error(f"Errors while shutting down:\n{errors_str}") - # Execute Silverback shutdown task before shutting down the broker - for shutdown_task in self.app.tasks[TaskType.SHUTDOWN]: - task = await shutdown_task.handler.kiq() - result = self._handle_result(await task.wait_result()) + elif self.recorder: + converted_results = map(TaskResult.from_taskiq, results) + await asyncio.gather(*(self.recorder.add_result(r) for r in converted_results)) + + # NOTE: No need to handle results otherwise await self.app.broker.shutdown() @@ -142,7 +192,6 @@ class WebsocketRunner(BaseRunner, ManagerAccessMixin): def __init__(self, app: SilverbackApp, *args, **kwargs): super().__init__(app, *args, **kwargs) - logger.info(f"Using {self.__class__.__name__}: max_exceptions={self.max_exceptions}") # Check for websocket support if not (ws_uri := app.chain_manager.provider.ws_uri): @@ -157,16 +206,9 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): async for raw_block in self.subscriptions.get_subscription_data(sub_id): block = self.provider.network.ecosystem.decode_block(hexbytes_dict(raw_block)) - if block.number is not None: - await self._checkpoint(last_block_seen=block.number) - - block_task = await block_handler.kiq(raw_block) - result = await block_task.wait_result() - - self._handle_result(result) - - if block.number is not None: - await self._checkpoint(last_block_processed=block.number) + await self._checkpoint(last_block_seen=block.number) + await self._handle_task(await block_handler.kiq(raw_block)) + await self._checkpoint(last_block_processed=block.number) async def _event_task( self, contract_event: ContractEvent, event_handler: AsyncTaskiqDecoratedTask @@ -180,7 +222,9 @@ async def _event_task( address=contract_event.contract.address, topics=["0x" + keccak(text=contract_event.abi.selector).hex()], ) - logger.debug(f"Handling '{contract_event.name}' events via {sub_id}") + logger.debug( + f"Handling '{contract_event.contract.address}:{contract_event.name}' logs via {sub_id}" + ) async for raw_event in self.subscriptions.get_subscription_data(sub_id): event = next( # NOTE: `next` is okay since it only has one item @@ -190,15 +234,9 @@ async def _event_task( ) ) - if event.block_number is not None: - await self._checkpoint(last_block_seen=event.block_number) - - event_task = await event_handler.kiq(event) - result = await event_task.wait_result() - self._handle_result(result) - - if event.block_number is not None: - await self._checkpoint(last_block_processed=event.block_number) + await self._checkpoint(last_block_seen=event.block_number) + await self._handle_task(await event_handler.kiq(event)) + await self._checkpoint(last_block_processed=event.block_number) async def run(self): async with Web3SubscriptionsManager(self.ws_uri) as subscriptions: @@ -233,15 +271,9 @@ async def _block_task(self, block_handler: AsyncTaskiqDecoratedTask): async for block in async_wrap_iter( chain.blocks.poll_blocks(start_block=start_block, new_block_timeout=new_block_timeout) ): - if block.number is not None: - await self._checkpoint(last_block_seen=block.number) - - block_task = await block_handler.kiq(block) - result = await block_task.wait_result() - self._handle_result(result) - - if block.number is not None: - await self._checkpoint(last_block_processed=block.number) + await self._checkpoint(last_block_seen=block.number) + await self._handle_task(await block_handler.kiq(block)) + await self._checkpoint(last_block_processed=block.number) async def _event_task( self, contract_event: ContractEvent, event_handler: AsyncTaskiqDecoratedTask @@ -263,12 +295,6 @@ async def _event_task( async for event in async_wrap_iter( contract_event.poll_logs(start_block=start_block, new_block_timeout=new_block_timeout) ): - if event.block_number is not None: - await self._checkpoint(last_block_seen=event.block_number) - - event_task = await event_handler.kiq(event) - result = await event_task.wait_result() - self._handle_result(result) - - if event.block_number is not None: - await self._checkpoint(last_block_processed=event.block_number) + await self._checkpoint(last_block_seen=event.block_number) + await self._handle_task(await event_handler.kiq(event)) + await self._checkpoint(last_block_processed=event.block_number) diff --git a/silverback/settings.py b/silverback/settings.py index d9f4d65e..bdefc2ed 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -23,7 +23,7 @@ class Settings(BaseSettings, ManagerAccessMixin): """ # A unique identifier for this silverback instance - INSTANCE: str = "default" + APP_NAME: str = "bot" BROKER_CLASS: str = "taskiq:InMemoryBroker" BROKER_URI: str = "" diff --git a/silverback/state.py b/silverback/state.py new file mode 100644 index 00000000..36e059a2 --- /dev/null +++ b/silverback/state.py @@ -0,0 +1,62 @@ +from pathlib import Path + +from pydantic import BaseModel, Field + +from .types import SilverbackID, UTCTimestamp, utc_now + + +class AppState(BaseModel): + # Last block number seen by runner + last_block_seen: int + + # Last block number processed by a worker + last_block_processed: int + + # Last time the state was updated + # NOTE: intended to use default when creating a model with this type + last_updated: UTCTimestamp = Field(default_factory=utc_now) + + +class AppDatastore: + """ + Very basic implementation used to store application state and handler result data by + storing/retreiving state from a JSON-encoded file. + + The file structure that this Recorder uses leverages the value of `SILVERBACK_APP_NAME` + as well as the configured network to determine the location where files get saved: + + ./.silverback-sessions/ + / + / + state.json # always write here + + Note that this format can be read by basic means (even in a JS frontend): + + You may also want to give your app a unique name so the data does not get overwritten, + if you are using multiple apps from the same directory: + + - `SILVERBACK_APP_NAME`: Any alphabetical string valid as a folder name + """ + + async def init(self, app_id: SilverbackID) -> AppState | None: + data_folder = ( + Path.cwd() / ".silverback-sessions" / app_id.name / app_id.ecosystem / app_id.network + ) + data_folder.mkdir(parents=True, exist_ok=True) + + self.state_backup_file = data_folder / "state.json" + + return ( + AppState.parse_file(self.state_backup_file) if self.state_backup_file.exists() else None + ) + + async def set_state(self, state: AppState): + if self.state_backup_file.exists(): + old_state = AppState.parse_file(self.state_backup_file) + if old_state.last_block_seen > state.last_block_seen: + state.last_block_seen = old_state.last_block_seen + if old_state.last_block_processed > state.last_block_processed: + state.last_block_processed = old_state.last_block_processed + + state.last_updated = utc_now() + self.state_backup_file.write_text(state.model_dump_json()) diff --git a/silverback/types.py b/silverback/types.py index 49529f5e..6448b72c 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -1,40 +1,60 @@ +from datetime import datetime, timezone +from decimal import Decimal from enum import Enum # NOTE: `enum.StrEnum` only in Python 3.11+ -from typing import Protocol +from typing import Literal -from pydantic import BaseModel -from typing_extensions import Self # Introduced 3.11 +from pydantic import BaseModel, Field +from pydantic.functional_serializers import PlainSerializer +from typing_extensions import Annotated class TaskType(str, Enum): - STARTUP = "silverback_startup" # TODO: Shorten in 0.4.0 + STARTUP = "startup" NEW_BLOCKS = "block" EVENT_LOG = "event" - SHUTDOWN = "silverback_shutdown" # TODO: Shorten in 0.4.0 + SHUTDOWN = "shutdown" def __str__(self) -> str: return self.value -class ISilverbackSettings(Protocol): - """Loose approximation of silverback.settings.Settings. If you can, use the class as - a type reference.""" +class SilverbackID(BaseModel): + name: str + ecosystem: str + network: str - INSTANCE: str - RECORDER_CLASS: str | None - def get_network_choice(self) -> str: - ... +def iso_format(dt: datetime) -> str: + return dt.isoformat() -class SilverbackID(BaseModel): - identifier: str - network_choice: str +def utc_now() -> datetime: + return datetime.now(timezone.utc) + + +UTCTimestamp = Annotated[ + datetime, + # TODO: Bug in TaskIQ can't serialize `datetime` + PlainSerializer(iso_format, return_type=str), +] + + +class _BaseDatapoint(BaseModel): + type: str # discriminator + + +# NOTE: Maximum supported parquet integer type: https://parquet.apache.org/docs/file-format/types +Int96 = Annotated[int, Field(ge=-(2**95), le=2**95 - 1)] +# NOTE: only these types of data are implicitly converted e.g. `{"something": 1, "else": 0.001}` +ScalarType = bool | Int96 | float | Decimal + + +class ScalarDatapoint(_BaseDatapoint): + type: Literal["scalar"] = "scalar" + data: ScalarType - @classmethod - def from_settings(cls, settings_: ISilverbackSettings) -> Self: - return cls(identifier=settings_.INSTANCE, network_choice=settings_.get_network_choice()) +# NOTE: Other datapoint types must be explicitly used -class SilverbackStartupState(BaseModel): - last_block_seen: int - last_block_processed: int +# TODO: Other datapoint types added to union here... +Datapoint = ScalarDatapoint