From 24f789ce2d5ce2253ca9eb13b6c8bbe81020b2f5 Mon Sep 17 00:00:00 2001 From: AlstonLi007 <148654191+AlstonLi007@users.noreply.github.com> Date: Thu, 9 Oct 2025 09:53:57 -0500 Subject: [PATCH] Add OpenBB Lab helpers --- README.md | 8 + docs/sdk.md | 48 ++++++ openbb_lab/__init__.py | 39 +++++ openbb_lab/platform/__init__.py | 33 ++++ openbb_lab/platform/data_plane.py | 129 +++++++++++++++ openbb_lab/platform/engine.py | 253 ++++++++++++++++++++++++++++++ openbb_lab/sdk.py | 35 +++++ 7 files changed, 545 insertions(+) create mode 100644 README.md create mode 100644 docs/sdk.md create mode 100644 openbb_lab/__init__.py create mode 100644 openbb_lab/platform/__init__.py create mode 100644 openbb_lab/platform/data_plane.py create mode 100644 openbb_lab/platform/engine.py create mode 100644 openbb_lab/sdk.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..4a54a86 --- /dev/null +++ b/README.md @@ -0,0 +1,8 @@ +# OpenBB Lab Helpers + +This repository provides lightweight helpers that emulate the platform surface +of OpenBB Lab. The modules introduced in this change expose the backtesting and +risk engines at the top level and provide Arrow Flight integration helpers for +data ingestion services. + +For usage examples see [docs/sdk.md](docs/sdk.md). diff --git a/docs/sdk.md b/docs/sdk.md new file mode 100644 index 0000000..676ba20 --- /dev/null +++ b/docs/sdk.md @@ -0,0 +1,48 @@ +# OpenBB Lab SDK Helpers + +The SDK exposes a thin layer around the platform engines so that notebooks and +external runtimes can interact with the Lab components without importing the +internal modules directly. + +## Backtesting + +Use `openbb_lab.sdk.run_backtest` to execute a strategy via a +`openbb_lab.platform.engine.BacktestEngine`. The helper forwards all arguments +to the engine and persists any tabular outputs as Apache Parquet files when an +`output_dir` is provided. + +``` +from pathlib import Path +from openbb_lab.sdk import BacktestEngine, run_backtest + +engine = BacktestEngine() +result = run_backtest(engine, data=my_table, output_dir=Path("./artifacts")) +``` + +## Risk Checks + +`openbb_lab.sdk.risk_check` runs a batch of `OrderTicket` instances through the +`RiskEngine` and returns a single `RiskAssessment` object describing the +violations discovered. + +``` +from openbb_lab.sdk import OrderTicket, risk_check + +assessment = risk_check([ + OrderTicket(symbol="AAPL", quantity=10, side="buy", price=188.2), +]) +``` + +## Data Plane + +The helper `openbb_lab.sdk.flight_register_table` registers callable readers +with a `FlightCatalog`. Pass one of the well known schema aliases (`"ticks"`, +`"bars"`, or `"depth"`) or your own `pyarrow.Schema` to describe the payload +returned by the reader. + +``` +from openbb_lab.sdk import FlightCatalog, flight_register_table + +catalog = FlightCatalog() +flight_register_table(catalog, "sample_ticks", lambda: table, schema="ticks") +``` diff --git a/openbb_lab/__init__.py b/openbb_lab/__init__.py new file mode 100644 index 0000000..ac20a20 --- /dev/null +++ b/openbb_lab/__init__.py @@ -0,0 +1,39 @@ +"""Convenience exports for the OpenBB Lab helpers. + +This lightweight package exposes the high level helpers that are designed to +be called from notebooks or foreign language bindings. The modules live under +``openbb_lab.platform`` but re-exporting them here keeps the public import path +short and backwards compatible with the older SDK layout. +""" + +from .platform.engine import ( + BacktestEngine, + BacktestResult, + OrderTicket, + RiskAssessment, + RiskEngine, + risk_check, + run_backtest, +) +from .platform.data_plane import ( + BAR_SCHEMA, + DEPTH_SCHEMA, + FlightCatalog, + TICK_SCHEMA, + flight_register_table, +) + +__all__ = [ + "BAR_SCHEMA", + "BacktestEngine", + "BacktestResult", + "DEPTH_SCHEMA", + "FlightCatalog", + "OrderTicket", + "RiskAssessment", + "RiskEngine", + "TICK_SCHEMA", + "flight_register_table", + "risk_check", + "run_backtest", +] diff --git a/openbb_lab/platform/__init__.py b/openbb_lab/platform/__init__.py new file mode 100644 index 0000000..7d571b8 --- /dev/null +++ b/openbb_lab/platform/__init__.py @@ -0,0 +1,33 @@ +"""Platform level helpers for OpenBB Lab.""" + +from .engine import ( + BacktestEngine, + BacktestResult, + OrderTicket, + RiskAssessment, + RiskEngine, + risk_check, + run_backtest, +) +from .data_plane import ( + BAR_SCHEMA, + DEPTH_SCHEMA, + FlightCatalog, + TICK_SCHEMA, + flight_register_table, +) + +__all__ = [ + "BAR_SCHEMA", + "BacktestEngine", + "BacktestResult", + "DEPTH_SCHEMA", + "FlightCatalog", + "OrderTicket", + "RiskAssessment", + "RiskEngine", + "TICK_SCHEMA", + "flight_register_table", + "risk_check", + "run_backtest", +] diff --git a/openbb_lab/platform/data_plane.py b/openbb_lab/platform/data_plane.py new file mode 100644 index 0000000..feec041 --- /dev/null +++ b/openbb_lab/platform/data_plane.py @@ -0,0 +1,129 @@ +"""Data plane helpers built around Apache Arrow Flight.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Callable, Dict, Iterable, Mapping + +import importlib.util + +_pa_spec = importlib.util.find_spec("pyarrow") +if _pa_spec is None: # pragma: no cover - runtime guard + raise ModuleNotFoundError("The `pyarrow` package is required to work with Flight data.") + +import pyarrow as pa + +TICK_SCHEMA = pa.schema( + [ + ("timestamp", pa.timestamp("ns")), + ("symbol", pa.string()), + ("price", pa.float64()), + ("size", pa.float64()), + ("exchange", pa.string()), + ("conditions", pa.list_(pa.string())), + ], +) + +BAR_SCHEMA = pa.schema( + [ + ("timestamp", pa.timestamp("ns")), + ("symbol", pa.string()), + ("open", pa.float64()), + ("high", pa.float64()), + ("low", pa.float64()), + ("close", pa.float64()), + ("volume", pa.float64()), + ("vwap", pa.float64()), + ("trades", pa.int64()), + ], +) + +DEPTH_SCHEMA = pa.schema( + [ + ("timestamp", pa.timestamp("ns")), + ("symbol", pa.string()), + ("side", pa.dictionary(pa.int8(), pa.string())), + ("price", pa.float64()), + ("size", pa.float64()), + ("level", pa.int32()), + ("orders", pa.int32()), + ], +) + +_SCHEMA_ALIASES: Dict[str, pa.Schema] = { + "tick": TICK_SCHEMA, + "ticks": TICK_SCHEMA, + "trade": TICK_SCHEMA, + "trades": TICK_SCHEMA, + "bar": BAR_SCHEMA, + "bars": BAR_SCHEMA, + "candle": BAR_SCHEMA, + "depth": DEPTH_SCHEMA, + "orderbook": DEPTH_SCHEMA, + "book": DEPTH_SCHEMA, +} + + +@dataclass +class FlightRegistration: + """Metadata stored for a Flight table registration.""" + + schema: pa.Schema + reader: Callable[..., pa.Table] + metadata: Mapping[str, Any] | None = None + + +class FlightCatalog: + """Registry of Arrow Flight table providers.""" + + def __init__(self) -> None: + self._tables: Dict[str, FlightRegistration] = {} + + def register(self, name: str, registration: FlightRegistration) -> None: + self._tables[name] = registration + + def get(self, name: str) -> FlightRegistration: + return self._tables[name] + + def list(self) -> Iterable[str]: + return self._tables.keys() + + +def flight_register_table( + catalog: FlightCatalog, + name: str, + reader: Callable[..., pa.Table], + *, + schema: pa.Schema | str, + metadata: Mapping[str, Any] | None = None, +) -> None: + """Register *reader* under *name* within *catalog* using a well-known schema.""" + + if not callable(reader): + raise TypeError("Readers passed to flight_register_table must be callable.") + + selected_schema = _resolve_schema(schema) + catalog.register(name, FlightRegistration(schema=selected_schema, reader=reader, metadata=metadata)) + + +def _resolve_schema(schema: pa.Schema | str) -> pa.Schema: + """Resolve schema aliases for common market data layouts.""" + + if isinstance(schema, pa.Schema): + return schema + + alias = schema.lower() + if alias not in _SCHEMA_ALIASES: + raise KeyError(f"Unknown schema alias: {schema!r}") + + return _SCHEMA_ALIASES[alias] + + +__all__ = [ + "BAR_SCHEMA", + "DEPTH_SCHEMA", + "FlightCatalog", + "FlightRegistration", + "TICK_SCHEMA", + "flight_register_table", +] diff --git a/openbb_lab/platform/engine.py b/openbb_lab/platform/engine.py new file mode 100644 index 0000000..eb194f5 --- /dev/null +++ b/openbb_lab/platform/engine.py @@ -0,0 +1,253 @@ +"""Core engines and helper entry points for OpenBB Lab workflows.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Callable, Iterable, Mapping, MutableMapping, Sequence + +import importlib.util + +_pa_spec = importlib.util.find_spec("pyarrow") +if _pa_spec is None: # pragma: no cover - runtime guard + raise ModuleNotFoundError("The `pyarrow` package is required to use the engine helpers.") + +import pyarrow as pa +import pyarrow.parquet as pq + +_pd_spec = importlib.util.find_spec("pandas") +if _pd_spec is not None: # pragma: no cover - optional dependency + import pandas as pd +else: # pragma: no cover - pandas is optional + pd = None # type: ignore[assignment] + + +@dataclass +class OrderTicket: + """Basic representation of an order to be validated by the risk engine.""" + + symbol: str + quantity: float + side: str + price: float | None = None + metadata: MutableMapping[str, Any] = field(default_factory=dict) + + +@dataclass +class RiskAssessment: + """Container returned by :func:`risk_check` with validation results.""" + + approved: bool + tickets: list[OrderTicket] + violations: list[str] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) + statistics: MutableMapping[str, Any] = field(default_factory=dict) + + +@dataclass +class BacktestResult: + """Structured result returned by :class:`BacktestEngine`.""" + + tables: Mapping[str, pa.Table] + metrics: Mapping[str, Any] = field(default_factory=dict) + metadata: Mapping[str, Any] = field(default_factory=dict) + + def iter_tables(self) -> Iterable[tuple[str, pa.Table]]: + """Yield the materialised result tables.""" + + yield from self.tables.items() + + +class BacktestEngine: + """Minimal backtest runner used by :func:`run_backtest`.""" + + def __init__( + self, + *, + metrics: Mapping[str, Callable[[pa.Table], Any]] | None = None, + ) -> None: + self._metrics = metrics or {} + + def run( + self, + data: pa.Table | Mapping[str, Any] | Sequence[Mapping[str, Any]], + strategy: Callable[[pa.Table], pa.Table] | None = None, + ) -> BacktestResult: + """Execute a backtest and collect tabular outputs.""" + + table = ensure_table(data) + tables: dict[str, pa.Table] = {"input": table} + + if strategy is not None: + strategy_output = strategy(table) + tables["strategy"] = ensure_table(strategy_output) + + computed_metrics: dict[str, Any] = {} + for name, calculator in self._metrics.items(): + computed_metrics[name] = calculator(table) + + return BacktestResult(tables=tables, metrics=computed_metrics) + + +class RiskEngine: + """Simple risk model that validates :class:`OrderTicket` objects.""" + + def __init__( + self, + *, + max_quantity: float = 1_000_000, + max_notional: float | None = 10_000_000, + allowed_sides: Iterable[str] | None = None, + ) -> None: + self.max_quantity = float(max_quantity) + self.max_notional = float(max_notional) if max_notional is not None else None + self.allowed_sides = {side.lower() for side in (allowed_sides or {"buy", "sell", "short", "cover"})} + + def assess(self, tickets: Sequence[OrderTicket]) -> RiskAssessment: + """Validate tickets and return an aggregated assessment.""" + + violations: list[str] = [] + warnings: list[str] = [] + gross_quantity = 0.0 + + for index, ticket in enumerate(tickets): + side = ticket.side.lower() + if side not in self.allowed_sides: + violations.append(f"ticket[{index}] has unsupported side '{ticket.side}'") + + quantity = float(ticket.quantity) + if quantity == 0: + violations.append(f"ticket[{index}] has zero quantity") + elif abs(quantity) > self.max_quantity: + violations.append( + f"ticket[{index}] quantity {quantity} exceeds limit {self.max_quantity}", + ) + + if ticket.price is not None and self.max_notional is not None: + notional = abs(quantity * float(ticket.price)) + if notional > self.max_notional: + violations.append( + f"ticket[{index}] notional {notional} exceeds limit {self.max_notional}", + ) + + gross_quantity += abs(quantity) + + statistics: dict[str, Any] = {"gross_quantity": gross_quantity, "ticket_count": len(tickets)} + approved = not violations + return RiskAssessment( + approved=approved, + tickets=list(tickets), + violations=violations, + warnings=warnings, + statistics=statistics, + ) + + +def run_backtest( + engine: BacktestEngine, + /, + *args: Any, + output_dir: str | Path | None = None, + persist: bool = True, + **kwargs: Any, +) -> BacktestResult: + """Execute a backtest and optionally persist the resulting tables.""" + + result = engine.run(*args, **kwargs) + + if persist and output_dir is not None: + directory = Path(output_dir) + directory.mkdir(parents=True, exist_ok=True) + _write_backtest_outputs(result, directory) + + return result + + +def risk_check( + tickets: Iterable[OrderTicket], + /, + *, + engine: RiskEngine | None = None, +) -> RiskAssessment: + """Validate order tickets through :class:`RiskEngine`.""" + + selected_engine = engine or RiskEngine() + ticket_list = [ticket if isinstance(ticket, OrderTicket) else _coerce_ticket(ticket) for ticket in tickets] + return selected_engine.assess(ticket_list) + + +def ensure_table(data: Any) -> pa.Table: + """Convert supported inputs into a :class:`pyarrow.Table`.""" + + if isinstance(data, pa.Table): + return data + + if isinstance(data, pa.RecordBatch): + return pa.Table.from_batches([data]) + + if pd is not None and isinstance(data, pd.DataFrame): # pragma: no cover - optional dependency + return pa.Table.from_pandas(data, preserve_index=False) + + if isinstance(data, Mapping): + return pa.table(data) + + if isinstance(data, Sequence): + sequence = list(data) + if sequence and isinstance(sequence[0], Mapping): + return pa.Table.from_pylist(sequence) + raise TypeError("Sequence inputs must contain mapping items to be converted to a table.") + + raise TypeError(f"Unsupported data type {type(data)!r} for Arrow conversion.") + + +def _write_backtest_outputs(result: BacktestResult, directory: Path) -> None: + """Persist the tabular payload from *result* to ``directory`` as Parquet files.""" + + for name, table in _extract_tables(result).items(): + pq.write_table(ensure_table(table), directory / f"{name}.parquet") + + +def _extract_tables(result: BacktestResult | Mapping[str, Any]) -> Mapping[str, Any]: + """Return a mapping of table names from the provided *result*.""" + + if isinstance(result, BacktestResult): + return result.tables + + if isinstance(result, Mapping): + return result + + tables_attr = getattr(result, "tables", None) + if isinstance(tables_attr, Mapping): + return tables_attr + + raise TypeError("Backtest results must expose a mapping of tables.") + + +def _coerce_ticket(raw: Any) -> OrderTicket: + """Coerce dictionaries into :class:`OrderTicket` instances.""" + + if isinstance(raw, OrderTicket): + return raw + + if isinstance(raw, Mapping): + return OrderTicket( + symbol=str(raw.get("symbol", "")), + quantity=float(raw.get("quantity", 0)), + side=str(raw.get("side", "")), + price=float(raw["price"]) if "price" in raw and raw["price"] is not None else None, + metadata={key: value for key, value in raw.items() if key not in {"symbol", "quantity", "side", "price"}}, + ) + + raise TypeError("Tickets must be OrderTicket instances or mapping objects.") + + +__all__ = [ + "BacktestEngine", + "BacktestResult", + "OrderTicket", + "RiskAssessment", + "RiskEngine", + "ensure_table", + "risk_check", + "run_backtest", +] diff --git a/openbb_lab/sdk.py b/openbb_lab/sdk.py new file mode 100644 index 0000000..d9dec57 --- /dev/null +++ b/openbb_lab/sdk.py @@ -0,0 +1,35 @@ +"""Compatibility layer that mirrors the public SDK surface.""" + +from __future__ import annotations + +from .platform.data_plane import ( + BAR_SCHEMA, + DEPTH_SCHEMA, + FlightCatalog, + flight_register_table, + TICK_SCHEMA, +) +from .platform.engine import ( + BacktestEngine, + BacktestResult, + OrderTicket, + RiskAssessment, + RiskEngine, + risk_check, + run_backtest, +) + +__all__ = [ + "BAR_SCHEMA", + "BacktestEngine", + "BacktestResult", + "DEPTH_SCHEMA", + "FlightCatalog", + "OrderTicket", + "RiskAssessment", + "RiskEngine", + "TICK_SCHEMA", + "flight_register_table", + "risk_check", + "run_backtest", +]