diff --git a/examples/basic_order_bot.py b/examples/basic_order_bot.py new file mode 100644 index 000000000..51575cc39 --- /dev/null +++ b/examples/basic_order_bot.py @@ -0,0 +1,263 @@ +# from pprint import pformat +from functools import partial +from decimal import Decimal +from typing import Callable + +import tractor +import trio +from uuid import uuid4 + +from piker.service import maybe_open_pikerd +from piker.accounting import dec_digits +from piker.clearing import ( + open_ems, + OrderClient, +) +# TODO: we should probably expose these top level in this subsys? +from piker.clearing._messages import ( + Order, + Status, + BrokerdPosition, +) +from piker.data import ( + iterticks, + Flume, + open_feed, + Feed, + # ShmArray, +) + + +# TODO: handle other statuses: +# - fills, errors, and position tracking +async def wait_for_order_status( + trades_stream: tractor.MsgStream, + oid: str, + expect_status: str, + +) -> tuple[ + list[Status], + list[BrokerdPosition], +]: + ''' + Wait for a specific order status for a given dialog, return msg flow + up to that msg and any position update msgs in a tuple. + + ''' + # Wait for position message before moving on to verify flow(s) + # for the multi-order position entry/exit. + status_msgs: list[Status] = [] + pp_msgs: list[BrokerdPosition] = [] + + async for msg in trades_stream: + match msg: + case {'name': 'position'}: + ppmsg = BrokerdPosition(**msg) + pp_msgs.append(ppmsg) + + case { + 'name': 'status', + }: + msg = Status(**msg) + status_msgs.append(msg) + + # if we get the status we expect then return all + # collected msgs from the brokerd dialog up to the + # exected msg B) + if ( + msg.resp == expect_status + and msg.oid == oid + ): + return status_msgs, pp_msgs + + +async def bot_main(): + ''' + Boot the piker runtime, open an ems connection, submit + and process orders statuses in real-time. + + ''' + ll: str = 'info' + + # open an order ctl client, live data feed, trio nursery for + # spawning an order trailer task + client: OrderClient + trades_stream: tractor.MsgStream + feed: Feed + accounts: list[str] + + fqme: str = 'btcusdt.usdtm.perp.binance' + + async with ( + + # TODO: do this implicitly inside `open_ems()` ep below? + # init and sync actor-service runtime + maybe_open_pikerd( + loglevel=ll, + debug_mode=True, + + ), + open_ems( + fqme, + mode='paper', # {'live', 'paper'} + # mode='live', # for real-brokerd submissions + loglevel=ll, + + ) as ( + client, # OrderClient + trades_stream, # tractor.MsgStream startup_pps, + _, # positions + accounts, + _, # dialogs + ), + + open_feed( + fqmes=[fqme], + loglevel=ll, + + # TODO: if you want to throttle via downsampling + # how many tick updates your feed received on + # quote streams B) + # tick_throttle=10, + ) as feed, + + trio.open_nursery() as tn, + ): + assert accounts + print(f'Loaded binance accounts: {accounts}') + + flume: Flume = feed.flumes[fqme] + min_tick = Decimal(flume.mkt.price_tick) + min_tick_digits: int = dec_digits(min_tick) + price_round: Callable = partial( + round, + ndigits=min_tick_digits, + ) + + quote_stream: trio.abc.ReceiveChannel = feed.streams['binance'] + + + # always keep live limit 0.003% below last + # clearing price + clear_margin: float = 0.9997 + + async def trailer( + order: Order, + ): + # ref shm OHLCV array history, if you want + # s_shm: ShmArray = flume.rt_shm + # m_shm: ShmArray = flume.hist_shm + + # NOTE: if you wanted to frame ticks by type like the + # the quote throttler does.. and this is probably + # faster in terms of getting the latest tick type + # embedded value of interest? + # from piker.data._sampling import frame_ticks + + async for quotes in quote_stream: + for fqme, quote in quotes.items(): + # print( + # f'{quote["symbol"]} -> {quote["ticks"]}\n' + # f'last 1s OHLC:\n{s_shm.array[-1]}\n' + # f'last 1m OHLC:\n{m_shm.array[-1]}\n' + # ) + + for tick in iterticks( + quote, + reverse=True, + # types=('trade', 'dark_trade'), # defaults + ): + + await client.update( + uuid=order.oid, + price=price_round( + clear_margin + * + tick['price'] + ), + ) + msgs, pps = await wait_for_order_status( + trades_stream, + order.oid, + 'open' + ) + # if multiple clears per quote just + # skip to the next quote? + break + + + # get first live quote to be sure we submit the initial + # live buy limit low enough that it doesn't clear due to + # a stale initial price from the data feed layer! + first_ask_price: float | None = None + async for quotes in quote_stream: + for fqme, quote in quotes.items(): + # print(quote['symbol']) + for tick in iterticks(quote, types=('ask')): + first_ask_price: float = tick['price'] + break + + if first_ask_price: + break + + # setup order dialog via first msg + price: float = price_round( + clear_margin + * + first_ask_price, + ) + + # compute a 1k USD sized pos + size: float = round(1e3/price, ndigits=3) + + order = Order( + + # docs on how this all works, bc even i'm not entirely + # clear XD. also we probably want to figure out how to + # offer both the paper engine running and the brokerd + # order ctl tasks with the ems choosing which stream to + # route msgs on given the account value! + account='paper', # use built-in paper clearing engine and .accounting + # account='binance.usdtm', # for live binance futes + + oid=str(uuid4()), + exec_mode='live', # {'dark', 'live', 'alert'} + + action='buy', # TODO: remove this from our schema? + + size=size, + symbol=fqme, + price=price, + brokers=['binance'], + ) + await client.send(order) + + msgs, pps = await wait_for_order_status( + trades_stream, + order.oid, + 'open', + ) + + assert not pps + assert msgs[-1].oid == order.oid + + # start "trailer task" which tracks rt quote stream + tn.start_soon(trailer, order) + + try: + # wait for ctl-c from user.. + await trio.sleep_forever() + except KeyboardInterrupt: + # cancel the open order + await client.cancel(order.oid) + + msgs, pps = await wait_for_order_status( + trades_stream, + order.oid, + 'canceled' + ) + raise + + +if __name__ == '__main__': + trio.run(bot_main) diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index e6a4cbc9b..98457ace4 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -357,6 +357,7 @@ async def _api( # Check if we're configured to route order requests to the # venue equivalent's testnet. + use_testnet: bool = False section_name: str = self.venue2configkey[venue_key] if subconf := self.conf.get(section_name): use_testnet = ( diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index 067a21636..f063bee11 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -254,7 +254,7 @@ async def open_trade_dialog( wss: NoBsWs async with ( client.manage_listen_key() as listen_key, - open_autorecon_ws(f'{wss_url}/ws/{listen_key}') as wss, + open_autorecon_ws(f'{wss_url}/?listenKey={listen_key}') as wss, ): nsid: int = time_ns() await wss.send_msg({ diff --git a/piker/brokers/binance/venues.py b/piker/brokers/binance/venues.py index 6c9afc779..fe822dd0d 100644 --- a/piker/brokers/binance/venues.py +++ b/piker/brokers/binance/venues.py @@ -45,8 +45,8 @@ # 'wss://ws-api.binance.com:443/ws-api/v3', # https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams -_futes_ws: str = f'wss://fstream.{_domain}/ws/' -_auth_futes_ws: str = 'wss://fstream-auth.{_domain}/ws/' +_futes_ws: str = f'wss://fstream.{_domain}/ws' +_auth_futes_ws: str = 'wss://fstream-auth.{_domain}/ws' # test nets # NOTE: spot test network only allows certain ep sets: @@ -58,7 +58,7 @@ # 'wss://testnet.binance.vision/ws-api/v3' _testnet_futes_url: str = 'https://testnet.binancefuture.com' -_testnet_futes_ws: str = 'wss://stream.binancefuture.com' +_testnet_futes_ws: str = 'wss://stream.binancefuture.com/ws' MarketType = Literal[