Skip to content

Commit

Permalink
feat!: add data acquisition to recorders (#64)
Browse files Browse the repository at this point in the history
* refactor!: add data acquisition models to recorder

* refactor: reformat metrics a bit

* refactor: change how state annotations work a bit

* refactor!: remove SQLRecorder, replace with JSONLineRecorder

* refactor: migrate CircuitBreaker exception to subclass of Halt

* refactor!: migrate recorder config to CLI callback

also refactor result handling

* refactor!: clean up startup process significantly

* fix: don't use py 3.10 unions yet

* fix: missing Annotated from py 3.8

* refactor: fix rebase misses

* refactor: use a more recent block number

* fix: do not check `len(ContractEvent)` for performance reasons

* fix: display event signatures as strings

* fix: ensure that task name ends up in labels

* fix: feedback from peer review

* refactor: shorten name of startup/shutdown tags

* fix: wrong label selected to pull block number

* refactor: ensure all tasks have task name (not just silverback)

* feat: store startup and shutdown result via recorder

* refactor!: move `.identifier` from runner to app

* refactor!: remove WorkerState, suggest using TaskiqState

* refactor!: move application state from Recorder to new state datastore

* fix: revert back to .pop bug fix

* fix: move extra quotes for event signatures to middleware

* refactor: rename variable for clarity

* fix: unused import

* fix: was using app.state on shutdown
  • Loading branch information
fubuloubu authored May 2, 2024
1 parent 8f544a6 commit ced902d
Show file tree
Hide file tree
Showing 12 changed files with 480 additions and 479 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ version.py

# Ape stuff
.build/
.silverback-sessions/

**/.DS_Store
*.swp
Expand Down
65 changes: 40 additions & 25 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from ape_tokens import tokens # type: ignore[import]
from taskiq import Context, TaskiqDepends, TaskiqState

from silverback import CircuitBreaker, SilverbackApp, SilverbackStartupState
from silverback import AppState, CircuitBreaker, SilverbackApp

# Do this to initialize your app
# Do this first to initialize your app
app = SilverbackApp()

# NOTE: Don't do any networking until after initializing app
Expand All @@ -17,53 +17,68 @@


@app.on_startup()
def app_startup(startup_state: SilverbackStartupState):
return {"message": "Starting...", "block_number": startup_state.last_block_seen}
def app_startup(startup_state: AppState):
# NOTE: This is called just as the app is put into "run" state,
# and handled by the first available worker
# raise Exception # NOTE: Any exception raised on startup aborts immediately
return {"block_number": startup_state.last_block_seen}


# Can handle some resource initialization for each worker, like LLMs or database connections
class MyDB:
def execute(self, query: str):
pass


# Can handle some initialization on startup, like models or network connections
@app.on_worker_startup()
def worker_startup(state: TaskiqState):
def worker_startup(state: TaskiqState): # NOTE: You need the type hint here
# NOTE: Can put anything here, any python object works
state.db = MyDB()
state.block_count = 0
# state.db = MyDB()
return {"message": "Worker started."}
# raise Exception # NOTE: Any exception raised on worker startup aborts immediately


# This is how we trigger off of new blocks
@app.on_(chain.blocks)
# context must be a type annotated kwarg to be provided to the task
# NOTE: The type hint for block is `BlockAPI`, but we parse it using `EcosystemAPI`
# NOTE: If you need something from worker state, you have to use taskiq context
def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]):
context.state.block_count += 1
context.state.db.execute(f"some query {block.number}")
return len(block.transactions)


# This is how we trigger off of events
# Set new_block_timeout to adjust the expected block time.
@app.on_(USDC.Transfer, start_block=18588777, new_block_timeout=25)
# NOTE: Typing isn't required
@app.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25)
# NOTE: Typing isn't required, it will still be an Ape `ContractLog` type
def exec_event1(log):
if log.log_index % 7 == 3:
# If you ever want the app to shutdown under some scenario, call this exception
raise CircuitBreaker("Oopsie!")
# If you raise any exception, Silverback will track the failure and keep running
# NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself
raise ValueError("I don't like the number 3.")

return {"amount": log.amount}


@app.on_(YFI.Approval)
# Any handler function can be async too
async def exec_event2(log: ContractLog):
return log.amount

if log.log_index % 7 == 6:
# If you ever want the app to immediately shutdown under some scenario, raise this exception
raise CircuitBreaker("Oopsie!")

# Just in case you need to release some resources or something
@app.on_worker_shutdown()
def worker_shutdown(state):
return {
"message": f"Worker stopped after handling {state.block_count} blocks.",
"block_count": state.block_count,
}
return log.amount


# A final job to execute on Silverback shutdown
@app.on_shutdown()
def app_shutdown(state):
return {"message": "Stopping..."}
def app_shutdown():
# raise Exception # NOTE: Any exception raised on shutdown is ignored
return {"some_metric": 123}


# Just in case you need to release some resources or something inside each worker
@app.on_worker_shutdown()
def worker_shutdown(state: TaskiqState): # NOTE: You need the type hint here
state.db = None
# raise Exception # NOTE: Any exception raised on worker shutdown is ignored
4 changes: 2 additions & 2 deletions silverback/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from .application import SilverbackApp
from .exceptions import CircuitBreaker, SilverbackException
from .types import SilverbackStartupState
from .state import AppState

__all__ = [
"AppState",
"CircuitBreaker",
"SilverbackApp",
"SilverbackException",
"SilverbackStartupState",
]
19 changes: 17 additions & 2 deletions silverback/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ def _runner_callback(ctx, param, val):
raise ValueError(f"Failed to import runner '{val}'.")


def _recorder_callback(ctx, param, val):
if not val:
return None

elif recorder := import_from_string(val):
return recorder()

raise ValueError(f"Failed to import recorder '{val}'.")


def _account_callback(ctx, param, val):
if val:
val = val.alias.replace("dev_", "TEST::")
Expand Down Expand Up @@ -92,11 +102,16 @@ async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90):
help="An import str in format '<module>:<CustomRunner>'",
callback=_runner_callback,
)
@click.option(
"--recorder",
help="An import string in format '<module>:<CustomRecorder>'",
callback=_recorder_callback,
)
@click.option("-x", "--max-exceptions", type=int, default=3)
@click.argument("path")
def run(cli_ctx, account, runner, max_exceptions, path):
def run(cli_ctx, account, runner, recorder, max_exceptions, path):
app = import_from_string(path)
runner = runner(app, max_exceptions=max_exceptions)
runner = runner(app, recorder=recorder, max_exceptions=max_exceptions)
asyncio.run(runner.run())


Expand Down
28 changes: 19 additions & 9 deletions silverback/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError
from .settings import Settings
from .types import TaskType
from .types import SilverbackID, TaskType


@dataclass
Expand Down Expand Up @@ -46,9 +46,15 @@ def __init__(self, settings: Settings | None = None):
if not settings:
settings = Settings()

self.network = settings.get_provider_context()
provider_context = settings.get_provider_context()
# NOTE: This allows using connected ape methods e.g. `Contract`
provider = self.network.__enter__()
provider = provider_context.__enter__()

self.identifier = SilverbackID(
name=settings.APP_NAME,
network=provider.network.name,
ecosystem=provider.network.ecosystem.name,
)

# Adjust defaults from connection
if settings.NEW_BLOCK_TIMEOUT is None and (
Expand All @@ -64,20 +70,21 @@ def __init__(self, settings: Settings | None = None):
self.tasks: defaultdict[TaskType, list[TaskData]] = defaultdict(list)
self.poll_settings: dict[str, dict] = {}

atexit.register(self.network.__exit__, None, None, None)
atexit.register(provider_context.__exit__, None, None, None)

self.signer = settings.get_signer()
self.new_block_timeout = settings.NEW_BLOCK_TIMEOUT
self.start_block = settings.START_BLOCK

network_str = f'\n NETWORK="{provider.network.ecosystem.name}:{provider.network.name}"'
signer_str = f"\n SIGNER={repr(self.signer)}"
start_block_str = f"\n START_BLOCK={self.start_block}" if self.start_block else ""
new_block_timeout_str = (
f"\n NEW_BLOCK_TIMEOUT={self.new_block_timeout}" if self.new_block_timeout else ""
)
logger.info(
f"Loaded Silverback App:{network_str}"

network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}"
logger.success(
f'Loaded Silverback App:\n NETWORK="{network_choice}"'
f"{signer_str}{start_block_str}{new_block_timeout_str}"
)

Expand Down Expand Up @@ -120,11 +127,14 @@ def broker_task_decorator(
def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask:
labels = {"task_type": str(task_type)}

if container and isinstance(container, ContractEvent):
# NOTE: Do *not* do `if container` because that does a `len(container)` call,
# which for ContractEvent queries *every single log* ever emitted, and really
# we only want to determine if it is not None
if container is not None and isinstance(container, ContractEvent):
# Address is almost a certainty if the container is being used as a filter here.
if contract_address := getattr(container.contract, "address", None):
labels["contract_address"] = contract_address
labels["event_signature"] = container.abi.signature
labels["event_signature"] = f"{container.abi.signature}"

broker_task = self.broker.register_task(
handler,
Expand Down
22 changes: 17 additions & 5 deletions silverback/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Any
from typing import Any, Sequence

from ape.exceptions import ApeException
from ape.logging import logger

from .types import TaskType

Expand Down Expand Up @@ -31,14 +30,27 @@ class SilverbackException(ApeException):
"""Base Exception for any Silverback runtime faults."""


# TODO: `ExceptionGroup` added in Python 3.11
class StartupFailure(SilverbackException):
def __init__(self, *exceptions: Sequence[Exception]):
if error_str := "\n".join(str(e) for e in exceptions):
super().__init__(f"Startup failure(s):\n{error_str}")
else:
super().__init__("Startup failure(s) detected. See logs for details.")


class NoTasksAvailableError(SilverbackException):
def __init__(self):
super().__init__("No tasks to execute")


class Halt(SilverbackException):
def __init__(self):
super().__init__("App halted, must restart manually")


class CircuitBreaker(SilverbackException):
class CircuitBreaker(Halt):
"""Custom exception (created by user) that will trigger an application shutdown."""

def __init__(self, message: str):
logger.error(message)
super().__init__(message)
super(SilverbackException, self).__init__(message)
38 changes: 12 additions & 26 deletions silverback/middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
from eth_utils.conversions import to_hex
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult

from silverback.recorder import HandlerResult
from silverback.types import SilverbackID, TaskType
from silverback.types import TaskType
from silverback.utils import hexbytes_dict


Expand All @@ -22,11 +21,7 @@ def compute_block_time() -> int:

return int((head.timestamp - genesis.timestamp) / head.number)

settings = kwargs.pop("silverback_settings")

self.block_time = self.chain_manager.provider.network.block_time or compute_block_time()
self.ident = SilverbackID.from_settings(settings)
self.recorder = settings.get_recorder()

def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
# TODO: Necessary because bytes/HexBytes doesn't encode/deocde well for some reason
Expand All @@ -49,20 +44,28 @@ def fix_dict(data: dict, recurse_count: int = 0) -> dict:
return message

def _create_label(self, message: TaskiqMessage) -> str:
if labels_str := ",".join(f"{k}={v}" for k, v in message.labels.items()):
if labels_str := ",".join(
# NOTE: Have to add extra quotes around event signatures so they display as a string
f"{k}={v}" if k != "event_signature" else f'{k}="{v}"'
for k, v in message.labels.items()
if k != "task_name"
):
return f"{message.task_name}[{labels_str}]"

else:
return message.task_name

def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
# NOTE: Ensure we always have this, no matter what
message.labels["task_name"] = message.task_name

if "task_type" not in message.labels:
return message # Not a silverback task

task_type = message.labels.pop("task_type")
task_type_str = message.labels.pop("task_type")

try:
task_type = TaskType(task_type)
task_type = TaskType(task_type_str)
except ValueError:
return message # Not a silverback task

Expand Down Expand Up @@ -97,21 +100,4 @@ def post_execute(self, message: TaskiqMessage, result: TaskiqResult):
f"{self._create_label(message)} " f"- {result.execution_time:.3f}s{percent_display}"
)

async def post_save(self, message: TaskiqMessage, result: TaskiqResult):
if not self.recorder:
return

handler_result = HandlerResult.from_taskiq(
self.ident,
message.task_name,
message.labels.get("block_number"),
message.labels.get("log_index"),
result,
)

try:
await self.recorder.add_result(handler_result)
except Exception as err:
logger.error(f"Error storing result: {err}")

# NOTE: Unless stdout is ignored, error traceback appears in stdout, no need for `on_error`
Loading

0 comments on commit ced902d

Please sign in to comment.