diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..43ae0e2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.py[cod] diff --git a/openbb_lab/__init__.py b/openbb_lab/__init__.py new file mode 100644 index 0000000..d860cd0 --- /dev/null +++ b/openbb_lab/__init__.py @@ -0,0 +1,3 @@ +"""OpenBB Lab package initialization.""" + +__all__ = [] diff --git a/openbb_lab/cli.py b/openbb_lab/cli.py new file mode 100644 index 0000000..0cdbe1c --- /dev/null +++ b/openbb_lab/cli.py @@ -0,0 +1,142 @@ +"""Command line entry points for interacting with broker gateways.""" + +from __future__ import annotations + +import argparse +import json +from dataclasses import asdict, is_dataclass +from typing import Any, Iterable + +from .platform import broker_gateway +from .platform.contracts import OrderModification + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="OpenBB Lab Broker Gateway CLI") + subparsers = parser.add_subparsers(dest="command") + + _add_ibkr_commands(subparsers) + _add_tradovate_commands(subparsers) + + return parser + + +def _add_ibkr_commands(subparsers: argparse._SubParsersAction[argparse.ArgumentParser]) -> None: + cancel_parser = subparsers.add_parser("ibkr_cancel", help="Cancel an IBKR order") + cancel_parser.add_argument("order_id", help="Order identifier") + cancel_parser.set_defaults(handler=_handle_ibkr_cancel, broker='ibkr') + + modify_parser = subparsers.add_parser("ibkr_modify", help="Modify an IBKR order") + modify_parser.add_argument("order_id", help="Order identifier") + modify_parser.add_argument("--quantity", type=float, help="New quantity", dest="quantity") + modify_parser.add_argument("--price", type=float, help="New price", dest="price") + modify_parser.set_defaults(handler=_handle_ibkr_modify, broker='ibkr') + + ws_parser = subparsers.add_parser("ibkr_ws_listen", help="Listen to IBKR websocket events") + ws_parser.set_defaults(handler=_handle_ibkr_ws_listen, broker='ibkr') + + +def _add_tradovate_commands( + subparsers: argparse._SubParsersAction[argparse.ArgumentParser], +) -> None: + cancel_parser = subparsers.add_parser("tradovate_cancel", help="Cancel a Tradovate order") + cancel_parser.add_argument("order_id", help="Order identifier") + cancel_parser.set_defaults(handler=_handle_tradovate_cancel, broker='tradovate') + + modify_parser = subparsers.add_parser("tradovate_modify", help="Modify a Tradovate order") + modify_parser.add_argument("order_id", help="Order identifier") + modify_parser.add_argument("--quantity", type=float, help="New quantity", dest="quantity") + modify_parser.add_argument("--price", type=float, help="New price", dest="price") + modify_parser.set_defaults(handler=_handle_tradovate_modify, broker='tradovate') + + ws_parser = subparsers.add_parser( + "tradovate_ws_listen", + help="Listen to Tradovate websocket events", + ) + ws_parser.set_defaults(handler=_handle_tradovate_ws_listen, broker='tradovate') + + +# --------------------------------------------------------------------------- +# Command handlers +# --------------------------------------------------------------------------- + +def _handle_ibkr_cancel(args: argparse.Namespace) -> Any: + result = broker_gateway.cancel("ibkr", args.order_id) + return _serialize(result) + + +def _handle_ibkr_modify(args: argparse.Namespace) -> Any: + modification = OrderModification( + order_id=args.order_id, + quantity=args.quantity, + price=args.price, + ) + result = broker_gateway.modify("ibkr", modification) + return _serialize(result) + + +def _handle_ibkr_ws_listen(_: argparse.Namespace) -> Any: + events = list(broker_gateway.websocket_listen("ibkr")) + return events + + +def _handle_tradovate_cancel(args: argparse.Namespace) -> Any: + result = broker_gateway.cancel("tradovate", args.order_id) + return _serialize(result) + + +def _handle_tradovate_modify(args: argparse.Namespace) -> Any: + modification = OrderModification( + order_id=args.order_id, + quantity=args.quantity, + price=args.price, + ) + result = broker_gateway.modify("tradovate", modification) + return _serialize(result) + + +def _handle_tradovate_ws_listen(_: argparse.Namespace) -> Any: + events = list(broker_gateway.websocket_listen("tradovate")) + return events + + +# --------------------------------------------------------------------------- +# CLI Entrypoint +# --------------------------------------------------------------------------- + +def _serialize(data: Any) -> Any: + if is_dataclass(data): + return asdict(data) + if isinstance(data, dict): + return {key: _serialize(value) for key, value in data.items()} + if isinstance(data, Iterable) and not isinstance(data, (str, bytes)): + return [_serialize(item) for item in data] + return data + + +def main(argv: list[str] | None = None) -> int: + parser = _build_parser() + args = parser.parse_args(argv) + handler = getattr(args, "handler", None) + if handler is None: + parser.print_help() + return 1 + + try: + result = handler(args) + except Exception as error: # pragma: no cover - CLI surface + broker = getattr(args, 'broker', None) + if broker: + message = broker_gateway.normalize_error(broker, error) + else: + message = str(error) + print(message) + return 1 + + if result is not None: + print(json.dumps(result, indent=2, default=str)) + return 0 + + +if __name__ == "__main__": # pragma: no cover - CLI entrypoint + raise SystemExit(main()) diff --git a/openbb_lab/platform/__init__.py b/openbb_lab/platform/__init__.py new file mode 100644 index 0000000..ac51d3f --- /dev/null +++ b/openbb_lab/platform/__init__.py @@ -0,0 +1,25 @@ +"""Platform package exposing broker gateways.""" + +from . import broker_gateway # noqa: F401 +from .contracts import ( + CancelResult, + Order, + OrderModification, + OrderRequest, + OrderSide, + OrderStatus, + OrderType, + Position, +) + +__all__ = [ + "broker_gateway", + "CancelResult", + "Order", + "OrderModification", + "OrderRequest", + "OrderSide", + "OrderStatus", + "OrderType", + "Position", +] diff --git a/openbb_lab/platform/broker_gateway.py b/openbb_lab/platform/broker_gateway.py new file mode 100644 index 0000000..cfd4db0 --- /dev/null +++ b/openbb_lab/platform/broker_gateway.py @@ -0,0 +1,105 @@ +"""Facade that routes requests to concrete broker gateways.""" + +from __future__ import annotations + +from typing import Dict, Iterable, Iterator, Protocol + +from .contracts import ( + CancelResult, + Order, + OrderModification, + OrderRequest, + Position, +) +from .ibkr_gateway import IBKRGateway +from .tradovate_gateway import TradovateGateway + + +class SupportsBrokerGateway(Protocol): + """Protocol describing the surface exposed by broker gateways.""" + + name: str + + def route_order(self, order: OrderRequest) -> Order: ... + + def cancel(self, order_id: str) -> CancelResult: ... + + def modify(self, modification: OrderModification) -> Order: ... + + def positions_stream(self) -> Iterable[Position]: ... + + def websocket_listen(self) -> Iterator[Dict[str, str]]: ... + + @staticmethod + def normalize_error(error: Exception) -> str: ... + + +_GATEWAYS: Dict[str, SupportsBrokerGateway] = {} + + +def register_gateway(gateway: SupportsBrokerGateway) -> None: + """Register a gateway for broker level routing.""" + + _GATEWAYS[gateway.name.lower()] = gateway + + +def get_gateway(broker: str) -> SupportsBrokerGateway: + """Return the registered gateway for *broker*.""" + + try: + return _GATEWAYS[broker.lower()] + except KeyError as error: # pragma: no cover - defensive branch + raise KeyError(f"Broker '{broker}' is not registered") from error + + +def route_order(request: OrderRequest) -> Order: + """Route an order to the appropriate broker gateway.""" + + return get_gateway(request.broker).route_order(request) + + +def cancel(broker: str, order_id: str) -> CancelResult: + """Cancel an order via the registered broker gateway.""" + + return get_gateway(broker).cancel(order_id) + + +def modify(broker: str, modification: OrderModification) -> Order: + """Modify an order via the registered broker gateway.""" + + return get_gateway(broker).modify(modification) + + +def positions_stream(broker: str) -> Iterable[Position]: + """Return an iterable of positions from the broker gateway.""" + + return get_gateway(broker).positions_stream() + + +def normalize_error(broker: str, error: Exception) -> str: + """Normalize a broker specific exception to a human readable string.""" + + return get_gateway(broker).normalize_error(error) + + +def websocket_listen(broker: str) -> Iterator[Dict[str, str]]: + """Yield events from a broker specific websocket listener.""" + + return get_gateway(broker).websocket_listen() + + +# Register default gateways on import so they are available to the CLI. +register_gateway(IBKRGateway()) +register_gateway(TradovateGateway()) + + +__all__ = [ + "register_gateway", + "get_gateway", + "route_order", + "cancel", + "modify", + "positions_stream", + "normalize_error", + "websocket_listen", +] diff --git a/openbb_lab/platform/contracts.py b/openbb_lab/platform/contracts.py new file mode 100644 index 0000000..10cdb7f --- /dev/null +++ b/openbb_lab/platform/contracts.py @@ -0,0 +1,94 @@ +"""Shared platform contracts for broker integrations.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, Optional + + +class OrderSide(str, Enum): + """Order directions supported by the platform.""" + + BUY = "buy" + SELL = "sell" + + +class OrderType(str, Enum): + """Supported order types.""" + + MARKET = "market" + LIMIT = "limit" + + +class OrderStatus(str, Enum): + """Common order status values across brokers.""" + + NEW = "new" + PARTIALLY_FILLED = "partially_filled" + FILLED = "filled" + CANCELLED = "cancelled" + REJECTED = "rejected" + + +@dataclass(slots=True) +class OrderRequest: + """An order request emitted by a strategy or API consumer.""" + + broker: str + symbol: str + side: OrderSide + quantity: float + order_type: OrderType = OrderType.MARKET + price: Optional[float] = None + client_order_id: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(slots=True) +class OrderModification: + """User intent to update an existing order.""" + + order_id: str + quantity: Optional[float] = None + price: Optional[float] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(slots=True) +class Order: + """Normalized order representation shared by all broker gateways.""" + + broker: str + order_id: str + symbol: str + side: OrderSide + quantity: float + order_type: OrderType + status: OrderStatus + price: Optional[float] = None + filled_quantity: float = 0.0 + average_price: Optional[float] = None + client_order_id: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(slots=True) +class CancelResult: + """The result of cancelling an order.""" + + broker: str + order_id: str + status: OrderStatus + message: Optional[str] = None + + +@dataclass(slots=True) +class Position: + """Normalized broker position representation.""" + + broker: str + symbol: str + quantity: float + average_price: float + metadata: Dict[str, Any] = field(default_factory=dict) diff --git a/openbb_lab/platform/ibkr_client.py b/openbb_lab/platform/ibkr_client.py new file mode 100644 index 0000000..a51d0c2 --- /dev/null +++ b/openbb_lab/platform/ibkr_client.py @@ -0,0 +1,125 @@ +"""Simplified IBKR client used for testing broker gateway behavior.""" + +from __future__ import annotations + +from typing import Dict, Iterator, List + +from .contracts import ( + CancelResult, + Order, + OrderModification, + OrderRequest, + OrderSide, + OrderStatus, + OrderType, + Position, +) + + +class IBKRClient: + """Tiny in-memory representation of the Interactive Brokers client.""" + + def __init__(self) -> None: + self._orders: Dict[str, Order] = {} + self._sequence: int = 1 + + # ------------------------------------------------------------------ + # Order management + # ------------------------------------------------------------------ + def place_order(self, request: OrderRequest) -> Order: + """Place an order and track it locally.""" + + order_id = request.client_order_id or f"IBKR-{self._sequence}" + self._sequence += 1 + order = Order( + broker="ibkr", + order_id=order_id, + symbol=request.symbol, + side=request.side, + quantity=request.quantity, + order_type=request.order_type, + status=OrderStatus.NEW, + price=request.price, + client_order_id=request.client_order_id, + metadata=dict(request.metadata), + ) + self._orders[order_id] = order + return order + + def cancel_order(self, order_id: str) -> CancelResult: + """Cancel a tracked order.""" + + try: + order = self._orders[order_id] + except KeyError as error: # pragma: no cover - defensive branch + raise KeyError(f"Order {order_id} was not found") from error + order.status = OrderStatus.CANCELLED + return CancelResult( + broker="ibkr", + order_id=order_id, + status=OrderStatus.CANCELLED, + ) + + def modify_order(self, modification: OrderModification) -> Order: + """Modify an existing order.""" + + try: + order = self._orders[modification.order_id] + except KeyError as error: # pragma: no cover - defensive branch + raise KeyError(f"Order {modification.order_id} was not found") from error + + if modification.quantity is not None: + order.quantity = modification.quantity + if modification.price is not None: + order.price = modification.price + if modification.metadata: + order.metadata.update(modification.metadata) + return order + + # ------------------------------------------------------------------ + # Data streaming helpers + # ------------------------------------------------------------------ + def stream_positions(self) -> Iterator[Position]: + """Yield open positions based on currently active orders.""" + + for position in self._calculate_positions(): + yield position + + def websocket_listen(self) -> Iterator[Dict[str, str]]: + """Yield pseudo WebSocket events for tests and demos.""" + + for order in self._orders.values(): + yield { + "event": "order", + "broker": "ibkr", + "order_id": order.order_id, + "status": order.status.value, + } + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _calculate_positions(self) -> List[Position]: + aggregates: Dict[str, float] = {} + for order in self._orders.values(): + if order.status == OrderStatus.CANCELLED: + continue + direction = 1 if order.side == OrderSide.BUY else -1 + aggregates[order.symbol] = aggregates.get(order.symbol, 0.0) + direction * order.quantity + + positions: List[Position] = [] + for symbol, quantity in aggregates.items(): + if quantity == 0: + continue + positions.append( + Position( + broker="ibkr", + symbol=symbol, + quantity=quantity, + average_price=1.0 if quantity else 0.0, + ) + ) + return positions + + +__all__ = ["IBKRClient"] diff --git a/openbb_lab/platform/ibkr_gateway.py b/openbb_lab/platform/ibkr_gateway.py new file mode 100644 index 0000000..f2e6cc0 --- /dev/null +++ b/openbb_lab/platform/ibkr_gateway.py @@ -0,0 +1,46 @@ +"""Broker gateway implementation for Interactive Brokers.""" + +from __future__ import annotations + +from typing import Dict, Iterable, Iterator + +from .contracts import ( + CancelResult, + Order, + OrderModification, + OrderRequest, + Position, +) +from .ibkr_client import IBKRClient + + +class IBKRGateway: + """Adapter between the platform facade and the IBKR client.""" + + name = "ibkr" + + def __init__(self, client: IBKRClient | None = None) -> None: + self.client = client or IBKRClient() + + def route_order(self, order: OrderRequest) -> Order: + return self.client.place_order(order) + + def cancel(self, order_id: str) -> CancelResult: + return self.client.cancel_order(order_id) + + def modify(self, modification: OrderModification) -> Order: + return self.client.modify_order(modification) + + def positions_stream(self) -> Iterable[Position]: + return self.client.stream_positions() + + def websocket_listen(self) -> Iterator[Dict[str, str]]: + return self.client.websocket_listen() + + @staticmethod + def normalize_error(error: Exception) -> str: + message = error.args[0] if isinstance(error, KeyError) and error.args else str(error) + return f"IBKR: {message}" + + +__all__ = ["IBKRGateway"] diff --git a/openbb_lab/platform/tradovate_client.py b/openbb_lab/platform/tradovate_client.py new file mode 100644 index 0000000..827c7ff --- /dev/null +++ b/openbb_lab/platform/tradovate_client.py @@ -0,0 +1,101 @@ +"""Simplified Tradovate client used by the broker gateway facade.""" + +from __future__ import annotations + +from typing import Dict, Iterator, List + +from .contracts import ( + CancelResult, + Order, + OrderModification, + OrderRequest, + OrderSide, + OrderStatus, + Position, +) + + +class TradovateClient: + """In-memory representation of a Tradovate client.""" + + def __init__(self) -> None: + self._orders: Dict[str, Order] = {} + self._sequence: int = 1 + + def place_order(self, request: OrderRequest) -> Order: + order_id = request.client_order_id or f"TRADOVATE-{self._sequence}" + self._sequence += 1 + order = Order( + broker="tradovate", + order_id=order_id, + symbol=request.symbol, + side=request.side, + quantity=request.quantity, + order_type=request.order_type, + status=OrderStatus.NEW, + price=request.price, + client_order_id=request.client_order_id, + metadata=dict(request.metadata), + ) + self._orders[order_id] = order + return order + + def cancel_order(self, order_id: str) -> CancelResult: + try: + order = self._orders[order_id] + except KeyError as error: # pragma: no cover - defensive branch + raise KeyError(f"Order {order_id} was not found") from error + order.status = OrderStatus.CANCELLED + return CancelResult(broker="tradovate", order_id=order_id, status=OrderStatus.CANCELLED) + + def modify_order(self, modification: OrderModification) -> Order: + try: + order = self._orders[modification.order_id] + except KeyError as error: # pragma: no cover - defensive branch + raise KeyError(f"Order {modification.order_id} was not found") from error + + if modification.quantity is not None: + order.quantity = modification.quantity + if modification.price is not None: + order.price = modification.price + if modification.metadata: + order.metadata.update(modification.metadata) + return order + + def stream_positions(self) -> Iterator[Position]: + for position in self._calculate_positions(): + yield position + + def websocket_listen(self) -> Iterator[Dict[str, str]]: + for order in self._orders.values(): + yield { + "event": "order", + "broker": "tradovate", + "order_id": order.order_id, + "status": order.status.value, + } + + def _calculate_positions(self) -> List[Position]: + aggregates: Dict[str, float] = {} + for order in self._orders.values(): + if order.status == OrderStatus.CANCELLED: + continue + direction = 1 if order.side == OrderSide.BUY else -1 + aggregates[order.symbol] = aggregates.get(order.symbol, 0.0) + direction * order.quantity + + positions: List[Position] = [] + for symbol, quantity in aggregates.items(): + if quantity == 0: + continue + positions.append( + Position( + broker="tradovate", + symbol=symbol, + quantity=quantity, + average_price=1.0 if quantity else 0.0, + ) + ) + return positions + + +__all__ = ["TradovateClient"] diff --git a/openbb_lab/platform/tradovate_gateway.py b/openbb_lab/platform/tradovate_gateway.py new file mode 100644 index 0000000..b235cd9 --- /dev/null +++ b/openbb_lab/platform/tradovate_gateway.py @@ -0,0 +1,46 @@ +"""Broker gateway implementation for Tradovate.""" + +from __future__ import annotations + +from typing import Dict, Iterable, Iterator + +from .contracts import ( + CancelResult, + Order, + OrderModification, + OrderRequest, + Position, +) +from .tradovate_client import TradovateClient + + +class TradovateGateway: + """Adapter that exposes Tradovate client functionality.""" + + name = "tradovate" + + def __init__(self, client: TradovateClient | None = None) -> None: + self.client = client or TradovateClient() + + def route_order(self, order: OrderRequest) -> Order: + return self.client.place_order(order) + + def cancel(self, order_id: str) -> CancelResult: + return self.client.cancel_order(order_id) + + def modify(self, modification: OrderModification) -> Order: + return self.client.modify_order(modification) + + def positions_stream(self) -> Iterable[Position]: + return self.client.stream_positions() + + def websocket_listen(self) -> Iterator[Dict[str, str]]: + return self.client.websocket_listen() + + @staticmethod + def normalize_error(error: Exception) -> str: + message = error.args[0] if isinstance(error, KeyError) and error.args else str(error) + return f"Tradovate: {message}" + + +__all__ = ["TradovateGateway"]