Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__/
*.py[cod]
3 changes: 3 additions & 0 deletions openbb_lab/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"""OpenBB Lab package initialization."""

__all__ = []
142 changes: 142 additions & 0 deletions openbb_lab/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Command line entry points for interacting with broker gateways."""

from __future__ import annotations

import argparse
import json
from dataclasses import asdict, is_dataclass
from typing import Any, Iterable

from .platform import broker_gateway
from .platform.contracts import OrderModification


def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="OpenBB Lab Broker Gateway CLI")
subparsers = parser.add_subparsers(dest="command")

_add_ibkr_commands(subparsers)
_add_tradovate_commands(subparsers)

return parser


def _add_ibkr_commands(subparsers: argparse._SubParsersAction[argparse.ArgumentParser]) -> None:
cancel_parser = subparsers.add_parser("ibkr_cancel", help="Cancel an IBKR order")
cancel_parser.add_argument("order_id", help="Order identifier")
cancel_parser.set_defaults(handler=_handle_ibkr_cancel, broker='ibkr')

modify_parser = subparsers.add_parser("ibkr_modify", help="Modify an IBKR order")
modify_parser.add_argument("order_id", help="Order identifier")
modify_parser.add_argument("--quantity", type=float, help="New quantity", dest="quantity")
modify_parser.add_argument("--price", type=float, help="New price", dest="price")
modify_parser.set_defaults(handler=_handle_ibkr_modify, broker='ibkr')

ws_parser = subparsers.add_parser("ibkr_ws_listen", help="Listen to IBKR websocket events")
ws_parser.set_defaults(handler=_handle_ibkr_ws_listen, broker='ibkr')


def _add_tradovate_commands(
subparsers: argparse._SubParsersAction[argparse.ArgumentParser],
) -> None:
cancel_parser = subparsers.add_parser("tradovate_cancel", help="Cancel a Tradovate order")
cancel_parser.add_argument("order_id", help="Order identifier")
cancel_parser.set_defaults(handler=_handle_tradovate_cancel, broker='tradovate')

modify_parser = subparsers.add_parser("tradovate_modify", help="Modify a Tradovate order")
modify_parser.add_argument("order_id", help="Order identifier")
modify_parser.add_argument("--quantity", type=float, help="New quantity", dest="quantity")
modify_parser.add_argument("--price", type=float, help="New price", dest="price")
modify_parser.set_defaults(handler=_handle_tradovate_modify, broker='tradovate')

ws_parser = subparsers.add_parser(
"tradovate_ws_listen",
help="Listen to Tradovate websocket events",
)
ws_parser.set_defaults(handler=_handle_tradovate_ws_listen, broker='tradovate')


# ---------------------------------------------------------------------------
# Command handlers
# ---------------------------------------------------------------------------

def _handle_ibkr_cancel(args: argparse.Namespace) -> Any:
result = broker_gateway.cancel("ibkr", args.order_id)
return _serialize(result)


def _handle_ibkr_modify(args: argparse.Namespace) -> Any:
modification = OrderModification(
order_id=args.order_id,
quantity=args.quantity,
price=args.price,
)
result = broker_gateway.modify("ibkr", modification)
return _serialize(result)


def _handle_ibkr_ws_listen(_: argparse.Namespace) -> Any:
events = list(broker_gateway.websocket_listen("ibkr"))
return events
Comment on lines +78 to +80

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Stream websocket events instead of materializing the iterator

The websocket listener handler converts broker_gateway.websocket_listen("ibkr") into a list and returns it. Broker websocket streams typically never terminate, so this call will block until the stream closes and will retain every event in memory before anything is printed. Iterating over the iterator and emitting each event as it arrives would allow the CLI command to display live updates and avoid unbounded memory growth. (The Tradovate handler mirrors the same pattern.)

Useful? React with 👍 / 👎.



def _handle_tradovate_cancel(args: argparse.Namespace) -> Any:
result = broker_gateway.cancel("tradovate", args.order_id)
return _serialize(result)


def _handle_tradovate_modify(args: argparse.Namespace) -> Any:
modification = OrderModification(
order_id=args.order_id,
quantity=args.quantity,
price=args.price,
)
result = broker_gateway.modify("tradovate", modification)
return _serialize(result)


def _handle_tradovate_ws_listen(_: argparse.Namespace) -> Any:
events = list(broker_gateway.websocket_listen("tradovate"))
return events


# ---------------------------------------------------------------------------
# CLI Entrypoint
# ---------------------------------------------------------------------------

def _serialize(data: Any) -> Any:
if is_dataclass(data):
return asdict(data)
if isinstance(data, dict):
return {key: _serialize(value) for key, value in data.items()}
if isinstance(data, Iterable) and not isinstance(data, (str, bytes)):
return [_serialize(item) for item in data]
return data


def main(argv: list[str] | None = None) -> int:
parser = _build_parser()
args = parser.parse_args(argv)
handler = getattr(args, "handler", None)
if handler is None:
parser.print_help()
return 1

try:
result = handler(args)
except Exception as error: # pragma: no cover - CLI surface
broker = getattr(args, 'broker', None)
if broker:
message = broker_gateway.normalize_error(broker, error)
else:
message = str(error)
print(message)
return 1

if result is not None:
print(json.dumps(result, indent=2, default=str))
return 0


if __name__ == "__main__": # pragma: no cover - CLI entrypoint
raise SystemExit(main())
25 changes: 25 additions & 0 deletions openbb_lab/platform/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Platform package exposing broker gateways."""

from . import broker_gateway # noqa: F401
from .contracts import (
CancelResult,
Order,
OrderModification,
OrderRequest,
OrderSide,
OrderStatus,
OrderType,
Position,
)

__all__ = [
"broker_gateway",
"CancelResult",
"Order",
"OrderModification",
"OrderRequest",
"OrderSide",
"OrderStatus",
"OrderType",
"Position",
]
105 changes: 105 additions & 0 deletions openbb_lab/platform/broker_gateway.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Facade that routes requests to concrete broker gateways."""

from __future__ import annotations

from typing import Dict, Iterable, Iterator, Protocol

from .contracts import (
CancelResult,
Order,
OrderModification,
OrderRequest,
Position,
)
from .ibkr_gateway import IBKRGateway
from .tradovate_gateway import TradovateGateway


class SupportsBrokerGateway(Protocol):
"""Protocol describing the surface exposed by broker gateways."""

name: str

def route_order(self, order: OrderRequest) -> Order: ...

def cancel(self, order_id: str) -> CancelResult: ...

def modify(self, modification: OrderModification) -> Order: ...

def positions_stream(self) -> Iterable[Position]: ...

def websocket_listen(self) -> Iterator[Dict[str, str]]: ...

@staticmethod
def normalize_error(error: Exception) -> str: ...


_GATEWAYS: Dict[str, SupportsBrokerGateway] = {}


def register_gateway(gateway: SupportsBrokerGateway) -> None:
"""Register a gateway for broker level routing."""

_GATEWAYS[gateway.name.lower()] = gateway


def get_gateway(broker: str) -> SupportsBrokerGateway:
"""Return the registered gateway for *broker*."""

try:
return _GATEWAYS[broker.lower()]
except KeyError as error: # pragma: no cover - defensive branch
raise KeyError(f"Broker '{broker}' is not registered") from error


def route_order(request: OrderRequest) -> Order:
"""Route an order to the appropriate broker gateway."""

return get_gateway(request.broker).route_order(request)


def cancel(broker: str, order_id: str) -> CancelResult:
"""Cancel an order via the registered broker gateway."""

return get_gateway(broker).cancel(order_id)


def modify(broker: str, modification: OrderModification) -> Order:
"""Modify an order via the registered broker gateway."""

return get_gateway(broker).modify(modification)


def positions_stream(broker: str) -> Iterable[Position]:
"""Return an iterable of positions from the broker gateway."""

return get_gateway(broker).positions_stream()


def normalize_error(broker: str, error: Exception) -> str:
"""Normalize a broker specific exception to a human readable string."""

return get_gateway(broker).normalize_error(error)


def websocket_listen(broker: str) -> Iterator[Dict[str, str]]:
"""Yield events from a broker specific websocket listener."""

return get_gateway(broker).websocket_listen()


# Register default gateways on import so they are available to the CLI.
register_gateway(IBKRGateway())
register_gateway(TradovateGateway())


__all__ = [
"register_gateway",
"get_gateway",
"route_order",
"cancel",
"modify",
"positions_stream",
"normalize_error",
"websocket_listen",
]
94 changes: 94 additions & 0 deletions openbb_lab/platform/contracts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""Shared platform contracts for broker integrations."""

from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional


class OrderSide(str, Enum):
"""Order directions supported by the platform."""

BUY = "buy"
SELL = "sell"


class OrderType(str, Enum):
"""Supported order types."""

MARKET = "market"
LIMIT = "limit"


class OrderStatus(str, Enum):
"""Common order status values across brokers."""

NEW = "new"
PARTIALLY_FILLED = "partially_filled"
FILLED = "filled"
CANCELLED = "cancelled"
REJECTED = "rejected"


@dataclass(slots=True)
class OrderRequest:
"""An order request emitted by a strategy or API consumer."""

broker: str
symbol: str
side: OrderSide
quantity: float
order_type: OrderType = OrderType.MARKET
price: Optional[float] = None
client_order_id: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass(slots=True)
class OrderModification:
"""User intent to update an existing order."""

order_id: str
quantity: Optional[float] = None
price: Optional[float] = None
metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass(slots=True)
class Order:
"""Normalized order representation shared by all broker gateways."""

broker: str
order_id: str
symbol: str
side: OrderSide
quantity: float
order_type: OrderType
status: OrderStatus
price: Optional[float] = None
filled_quantity: float = 0.0
average_price: Optional[float] = None
client_order_id: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass(slots=True)
class CancelResult:
"""The result of cancelling an order."""

broker: str
order_id: str
status: OrderStatus
message: Optional[str] = None


@dataclass(slots=True)
class Position:
"""Normalized broker position representation."""

broker: str
symbol: str
quantity: float
average_price: float
metadata: Dict[str, Any] = field(default_factory=dict)
Loading