Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.5.5
current_version = 1.5.6
commit = True
tag = True

Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## [1.5.6] - 2025-11-12
### Added
- Allow connection to a subset of plants only

## [1.5.5] - 2025-08-23
### Fixed
- Bug for timestamps with leading zeros for microseconds (by @briandecamp)

## [1.5.4] - 2025-07-28
### Added
- Order book/market depth methods
Expand Down
2 changes: 1 addition & 1 deletion async_rithmic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from .exceptions import *
from .objects import RetrySettings, ReconnectionSettings

__version__ = '1.5.5'
__version__ = '1.5.6'
33 changes: 32 additions & 1 deletion async_rithmic/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .plants.order import OrderPlant
from .plants.pnl import PnlPlant
from .logger import logger
from .enums import SysInfraType
from .objects import ReconnectionSettings, RetrySettings

def _setup_ssl_context():
Expand Down Expand Up @@ -46,6 +47,7 @@ def __init__(
self.on_rithmic_order_notification = Event()
self.on_exchange_order_notification = Event()
self.on_bracket_update = Event()
self.on_trade_route_update = Event()

# Historical data events
self.on_historical_tick = Event()
Expand Down Expand Up @@ -103,9 +105,19 @@ def __init__(
self.on_connected += lambda plant_type: self.plants[plant_type].logger.debug("Connected")
self.on_disconnected += lambda plant_type: self.plants[plant_type].logger.debug("Disconnected")

async def connect(self):
async def connect(self, **kwargs):
target_plants = kwargs.get("plants", [
SysInfraType.ORDER_PLANT,
SysInfraType.HISTORY_PLANT,
SysInfraType.TICKER_PLANT,
SysInfraType.PNL_PLANT
])

try:
for plant in self.plants.values():
if plant.infra_type not in target_plants:
continue

await plant._connect()

await plant._start_background_tasks()
Expand Down Expand Up @@ -138,3 +150,22 @@ async def _disconnect_plant(self, plant):
await plant._stop_background_tasks()
await plant._logout()
await plant._disconnect()

@property
def accounts(self):
return self.plants["order"].accounts

@property
def fcm_id(self):
login_info = self.plants["order"].login_info
return login_info["fcm_id"] if login_info else None

@property
def ib_id(self):
login_info = self.plants["order"].login_info
return login_info["ib_id"] if login_info else None

@property
def user_type(self):
login_info = self.plants["order"].login_info
return login_info["user_type"] if login_info else None
1 change: 1 addition & 0 deletions async_rithmic/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ class DataType(enum.IntEnum):
InstrumentType = pb.request_search_symbols_pb2.RequestSearchSymbols.InstrumentType
SearchPattern = pb.request_search_symbols_pb2.RequestSearchSymbols.Pattern

SysInfraType = pb.request_login_pb2.RequestLogin.SysInfraType
20 changes: 12 additions & 8 deletions async_rithmic/plants/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from .. import protocol_buffers as pb
from ..logger import logger
from ..enums import SysInfraType
from ..exceptions import RithmicErrorResponse
from ..helpers.request_manager import RequestManager
from ..helpers.connectivity import DisconnectionHandler, try_to_reconnect
Expand Down Expand Up @@ -171,10 +172,10 @@ def ssl_context(self):
@property
def plant_type(self):
return {
pb.request_login_pb2.RequestLogin.SysInfraType.HISTORY_PLANT: "history",
pb.request_login_pb2.RequestLogin.SysInfraType.PNL_PLANT: "pnl",
pb.request_login_pb2.RequestLogin.SysInfraType.TICKER_PLANT: "ticker",
pb.request_login_pb2.RequestLogin.SysInfraType.ORDER_PLANT: "order",
SysInfraType.HISTORY_PLANT: "history",
SysInfraType.PNL_PLANT: "pnl",
SysInfraType.TICKER_PLANT: "ticker",
SysInfraType.ORDER_PLANT: "order",
}[self.infra_type]

async def _connect(self):
Expand Down Expand Up @@ -304,6 +305,13 @@ def _build_request(self, **kwargs):
for k, v in kwargs.items():
self._set_pb_field(request, k, v)

if request.DESCRIPTOR.fields_by_name.get("fcm_id"):
request.fcm_id = self.client.fcm_id
if request.DESCRIPTOR.fields_by_name.get("ib_id"):
request.ib_id = self.client.ib_id
if request.DESCRIPTOR.fields_by_name.get("user_type"):
request.user_type = self.client.user_type

return request

async def _send_and_recv_immediate(self, **kwargs):
Expand Down Expand Up @@ -388,10 +396,6 @@ async def _send_and_collect(self, template_id, **kwargs):
account_id = self.client.plants["order"]._get_account_id(**kwargs)
kwargs["account_id"] = account_id

login_info = self.client.plants["order"].login_info
kwargs["fcm_id"] = login_info["fcm_id"]
kwargs["ib_id"] = login_info["ib_id"]

retries = self.client.retry_settings.max_retries
if template_id in [312, 330]:
# Don't retry NewOrder requests
Expand Down
4 changes: 2 additions & 2 deletions async_rithmic/plants/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
from collections import defaultdict

from .base import BasePlant
from ..enums import TimeBarType
from ..enums import SysInfraType, TimeBarType
from .. import protocol_buffers as pb

class HistoryPlant(BasePlant):
infra_type = pb.request_login_pb2.RequestLogin.SysInfraType.HISTORY_PLANT
infra_type = SysInfraType.HISTORY_PLANT

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down
23 changes: 7 additions & 16 deletions async_rithmic/plants/order.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import asyncio

from .base import BasePlant
from ..enums import OrderType, OrderDuration, OrderPlacement, TransactionType
from ..enums import SysInfraType, OrderType, OrderDuration, OrderPlacement, TransactionType
from ..exceptions import InvalidRequestError
from .. import protocol_buffers as pb

BracketType = pb.request_bracket_order_pb2.RequestBracketOrder.BracketType

class OrderPlant(BasePlant):
infra_type = pb.request_login_pb2.RequestLogin.SysInfraType.ORDER_PLANT
infra_type = SysInfraType.ORDER_PLANT

login_info = None
trade_routes = None
Expand Down Expand Up @@ -51,9 +51,6 @@ async def list_accounts(self) -> list:
return await self._send_and_collect(
template_id=302,
expected_response=dict(template_id=303),
fcm_id=self.login_info["fcm_id"],
ib_id=self.login_info["ib_id"],
user_type=self.login_info["user_type"],
account_id=None,
)

Expand All @@ -72,8 +69,6 @@ async def _list_trade_routes(self) -> list:
async def _subscribe_to_updates(self, **kwargs):
for account in self.accounts:
await self._send_and_recv_immediate(
fcm_id=self.login_info["fcm_id"],
ib_id=self.login_info["ib_id"],
account_id=account.account_id,
**kwargs
)
Expand All @@ -82,9 +77,6 @@ async def get_account_rms(self):
return await self._send_and_collect(
template_id=304,
expected_response=dict(template_id=305),
fcm_id=self.login_info["fcm_id"],
ib_id=self.login_info["ib_id"],
user_type=self.login_info["user_type"],
account_id=None,
)

Expand Down Expand Up @@ -273,8 +265,6 @@ async def submit_order(
manual_or_auto=OrderPlacement.MANUAL,
transaction_type=transaction_type,
duration=kwargs["duration"],
fcm_id=self.login_info["fcm_id"],
ib_id=self.login_info["ib_id"],
**msg_kwargs
)

Expand All @@ -300,8 +290,6 @@ async def cancel_order(self, **kwargs):
manual_or_auto=OrderPlacement.MANUAL,
basket_id=basket_id,
account_id=account_id,
fcm_id=self.login_info["fcm_id"],
ib_id=self.login_info["ib_id"],
)

async def cancel_all_orders(self, **kwargs):
Expand All @@ -312,7 +300,6 @@ async def cancel_all_orders(self, **kwargs):
template_id=346,
expected_response=dict(template_id=347),
manual_or_auto=OrderPlacement.MANUAL,
user_type=self.login_info["user_type"],
account_id=self._get_account_id(**kwargs)
)

Expand Down Expand Up @@ -428,7 +415,11 @@ async def _process_response(self, response):
if await super()._process_response(response):
return True

if response.template_id == 351:
if response.template_id == 350:
# Trade route update
await self.client.on_trade_route_update.call_async(response)

elif response.template_id == 351:
# Rithmic order notification
await self.client.on_rithmic_order_notification.call_async(response)

Expand Down
27 changes: 8 additions & 19 deletions async_rithmic/plants/pnl.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,9 @@
from .base import BasePlant
from ..enums import SysInfraType
from .. import protocol_buffers as pb

class PnlPlant(BasePlant):
infra_type = pb.request_login_pb2.RequestLogin.SysInfraType.PNL_PLANT

@property
def _accounts(self):
return self.client.plants["order"].accounts

@property
def _fcm_id(self):
return self.client.plants["order"].login_info["fcm_id"]

@property
def _ib_id(self):
return self.client.plants["order"].login_info["ib_id"]
infra_type = SysInfraType.PNL_PLANT

async def _login(self):
await super()._login()
Expand All @@ -28,11 +17,11 @@ async def subscribe_to_pnl_updates(self):
"""
self._subscriptions["pnl"].add(1)

for account in self._accounts:
for account in self.client.accounts:
await self._send_request(
template_id=400,
fcm_id=self._fcm_id,
ib_id=self._ib_id,
fcm_id=self.client.fcm_id,
ib_id=self.client.ib_id,
account_id=account.account_id,
request=pb.request_pnl_position_updates_pb2.RequestPnLPositionUpdates.Request.SUBSCRIBE
)
Expand All @@ -43,11 +32,11 @@ async def unsubscribe_from_pnl_updates(self):
"""
self._subscriptions["pnl"].discard(1)

for account in self._accounts:
for account in self.client.accounts:
await self._send_request(
template_id=400,
fcm_id=self._fcm_id,
ib_id=self._ib_id,
fcm_id=self.client.fcm_id,
ib_id=self.client.ib_id,
account_id=account.account_id,
request=pb.request_pnl_position_updates_pb2.RequestPnLPositionUpdates.Request.UNSUBSCRIBE
)
Expand Down
4 changes: 2 additions & 2 deletions async_rithmic/plants/ticker.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from typing import Union

from .base import BasePlant
from ..enums import DataType, SearchPattern
from ..enums import SysInfraType, DataType, SearchPattern
from .. import protocol_buffers as pb

class TickerPlant(BasePlant):
infra_type = pb.request_login_pb2.RequestLogin.SysInfraType.TICKER_PLANT
infra_type = SysInfraType.TICKER_PLANT

async def _login(self):
await super()._login()
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "async_rithmic"
version = "1.5.5"
version = "1.5.6"
description = "Python API Integration with Rithmic Protocol Buffer API"
readme = "README.md"
requires-python = ">=3.10"
Expand Down