Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
docker
*.code-workspace
.dockerignore
htmlcov
htmlcov
.env
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
LOG_LEVEL="NOTSET"
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.coverage
.venv
**/__pycache__/*
**/__pycache__/*
**/*.log
7 changes: 7 additions & 0 deletions pytrade/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pytrade.interfaces.client import IClient
from pytrade.interfaces.data import IInstrumentData
from pytrade.interfaces.position import IPosition
from pytrade.logging import get_logger
from pytrade.models import Order


Expand All @@ -16,6 +17,7 @@ def __init__(self, client: IClient):
self._orders: List[Order] = []
self._data_context = CandleData(max_size=100)
self._subscriptions: list[Tuple[Instrument, Granularity]] = []
self.logger = get_logger()

@property
def equity(self) -> float:
Expand All @@ -39,15 +41,18 @@ def order(self, order: Order):
self._orders.append(order)

def process_orders(self):
self.logger.debug(f"Processing {len(self._orders)} orders.")
for order in self._orders:
self.client.order(order)

self._orders.clear()
self.logger.debug("Orders cleared.")

def load_instrument_candles(
self, instrument: Instrument, granularity: Granularity, count: int
):
key = (instrument, granularity)
self.logger.debug(f"Loading candles for {key}")

if key in self._subscriptions:
raise RuntimeError(
Expand All @@ -59,6 +64,7 @@ def load_instrument_candles(
if len(instrument_data.df) < count:
instrument_data.clear()
candles = self.client.get_candles(instrument, granularity, count)
self.logger.debug(f"Loading candles for {instrument_data}.")
for candle in candles:
instrument_data.update(candle)

Expand All @@ -67,6 +73,7 @@ def subscribe(
) -> IInstrumentData:

key = (instrument, granularity)
self.logger.debug(f"Subscribing to candles for {key}")

instrument_data = self._data_context.get(instrument, granularity)

Expand Down
27 changes: 24 additions & 3 deletions pytrade/data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from datetime import timedelta
from datetime import datetime, timedelta, timezone
from typing import Optional

import pandas as pd
Expand All @@ -8,6 +8,7 @@
from pytrade.events.event import Event
from pytrade.instruments import Candlestick, Granularity, Instrument
from pytrade.interfaces.data import IDataContext, IInstrumentData
from pytrade.logging import get_logger

COLUMNS = ["datetime", "instrument", "open", "high", "low", "close"]

Expand All @@ -33,6 +34,13 @@ def __init__(
self.__instrument: Optional[Instrument] = None
self.__granularity: Optional[Granularity] = None
self.__update_event = Event()
self.logger = get_logger()

def __str__(self):
return (
f"<InstrumentCandles instrument={self.__instrument} granularity={self.__granularity}"
f"max_size={self._max_size} timestamp={self.timestamp}>"
)

@property
def df(self):
Expand All @@ -48,7 +56,11 @@ def granularity(self):

@property
def timestamp(self) -> Timestamp:
return self._data.index[-1]
return (
self._data.index[-1]
if len(self._data) > 0
else pd.Timestamp(datetime.min, tzinfo=timezone.utc)
)

@property
def on_update(self):
Expand Down Expand Up @@ -76,13 +88,18 @@ def update(self, candlestick: Candlestick):
f"Received {candlestick.granularity} for history[{self.__granularity}]"
)

if len(self._data) > 0 and self._data.index[-1] == candlestick.timestamp:
self.logger.warning(
f"Received an duplicate update for {candlestick.instrument}[{candlestick.timestamp}]"
)

if self._max_size and len(self._data) >= self._max_size:
_delta = self._data.index[-1] - self._data.index[-2]
if not isinstance(_delta, timedelta):
raise RuntimeError(
"Expected dataframe to have DatetimeInex. Unable to caluclate timedelta from index."
)
if self._data.index[-1] >= candlestick.timestamp:
if self._data.index[-1] > candlestick.timestamp:
raise RuntimeError(
f"Received a candle update for on outdated timestamp. Update \
time {candlestick.timestamp} but dataframe is at {self._data.index[-1]}"
Expand Down Expand Up @@ -110,6 +127,10 @@ def __init__(self, max_size=1000):
self._data: dict[tuple[Instrument, Granularity], InstrumentCandles] = {}
self._max_size = max_size
self._update_event = asyncio.Event()
self.logger = get_logger()

def __str__(self):
return f"<CandleData max_size={self._max_size}>"

@property
def universe(self) -> list[IInstrumentData]:
Expand Down
3 changes: 3 additions & 0 deletions pytrade/indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ def __init__(self, data: IInstrumentData):
data.on_update += self._update
self._values = self._run()

def __str__(self):
return f"<{self.__class__.__name__} value={self.value}>"

def _update(self):
self._values = self._run()

Expand Down
12 changes: 12 additions & 0 deletions pytrade/instruments.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ def __instrument_eq__(self, other: "CandleSubscription"):

return result

def __str__(self):
return f"<{self.__class__.__name__} {self.instrument}[{self.granularity}]>"


class Candlestick:

Expand All @@ -132,6 +135,12 @@ def __init__(
self.close = close
self.timestamp = timestamp

def __str__(self):
return (
f"<{self.__class__.__name__} instrument={self.instrument} granularity={self.granularity} "
f"O: {self.open} H: {self.high} L: {self.low} C:{self.close} Time: {self.timestamp}>"
)

def to_dict(self):
return {
"datetime": self.timestamp,
Expand All @@ -158,3 +167,6 @@ def __init__(self, instrument: str, timestamp: str, bid: str, ask: str):
)
self.bid = float(bid)
self.ask = float(ask)

def __str__(self):
return f"<{self.__class__.__name__} {self.instrument} bid={self.bid:.5f} ask={self.ask:.5f}>"
37 changes: 37 additions & 0 deletions pytrade/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import inspect
import logging
import os

format = "%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(format)
log_level = getattr(logging, os.environ.get("LOG_LEVEL", "INFO").upper())

stream_handler = logging.StreamHandler()
stream_handler.setLevel(log_level)
stream_handler.setFormatter(formatter)

# Create a file handler to write logs to a file
file_handler = logging.FileHandler("app.log")
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)


def get_logger():
stack = inspect.stack()
parentframe = stack[1][0]

logger_name = "DEFAULT"

if "self" in parentframe.f_locals:
logger_name = parentframe.f_locals["self"].__class__.__name__
else:
module_info = inspect.getmodule(parentframe)
if module_info:
logger_name = module_info.__name__

del parentframe
_logger = logging.getLogger(logger_name)
_logger.setLevel(log_level)
_logger.addHandler(stream_handler)
_logger.addHandler(file_handler)
return _logger
11 changes: 10 additions & 1 deletion pytrade/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ def __init__(
self._trailing_stop_loss_on_fill: Optional[float] = trailing_stop_loss_on_fill
self.__parent_trade = parent_trade

def __str__(self):

return (
f"<Order instrument={self._instrument} size={self._size} "
f"stop={self._stop} limit={self._limit} price_bound={self._price_bound}"
f"time_in_force={self._time_in_force} tp={self.take_profit_on_fill} sl={self.stop_loss_on_fill}"
f"trailing_sl={self._trailing_stop_loss_on_fill} parent_trade={self.__parent_trade}>"
)

def __eq__(self, other: Any):
return other is self

Expand Down Expand Up @@ -134,7 +143,7 @@ def __init__(
self.__tp_order: Optional[Order] = None
self.__tag: Optional[str] = tag

def __repr__(self): # pragma: no cover
def __str__(self): # pragma: no cover
return (
f'<Trade size={self.__size} time={self.__entry_time}-{self.__exit_time or ""} '
f'price={self.__entry_price}-{self.__exit_price or ""} pl={self.pl:.0f}'
Expand Down
46 changes: 36 additions & 10 deletions pytrade/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
)
from pytrade.interfaces.broker import IBroker
from pytrade.interfaces.data import IDataContext, IInstrumentData
from pytrade.logging import get_logger
from pytrade.models import Order, TimeInForce


Expand All @@ -24,14 +25,17 @@ def __init__(self, broker: IBroker, data_context: IDataContext):
self._updates_complete = asyncio.Event()
self._data_context = data_context
self._pending_updates: list[CandleSubscription] = []
self.logger = get_logger()

def init(self) -> None:
self.logger.info("Initializing strategy.")
self._caluclate_updates()
# Call init first incase any indicators preload data for initial signals
self._init()
self._monitor_instruments()

def _caluclate_updates(self) -> None:
self.logger.debug("Calculating update intervals.")
self._required_updates: list[CandleSubscription] = []
max_interval = 0
_max_granularity = None
Expand All @@ -51,6 +55,11 @@ def _caluclate_updates(self) -> None:
# Just set to min and let the first update set it correctly
self._next_timestamp = Timestamp.min.replace(tzinfo=timezone.utc)

self.logger.debug(
f"Set updates to [{str.join(" ", (str(update) for update in self._required_updates))}] \
and next update time to {self._next_timestamp}"
)

@property
@abstractmethod
def subscriptions(self) -> list[CandleSubscription]:
Expand All @@ -61,36 +70,53 @@ def subscriptions(self) -> list[CandleSubscription]:
raise NotImplementedError()

def _monitor_instruments(self) -> None:
self.logger.debug("Subscribing to instruments.")
for subscription in self.subscriptions:
instrument_data = self.broker.subscribe(
subscription.instrument, subscription.granularity
)
instrument_data.on_update += self._update_callback(instrument_data)
self.logger.debug(f"Subscribed to {subscription}.")
self.logger.debug("Subscriptions complete")

def _update_callback(self, data: IInstrumentData):
return lambda: self._handle_update(data)

def _handle_update(self, data: IInstrumentData) -> None:
self.logger.debug(f"Received upate for {data}")
# If data was missed move to the next update window based on data
if data.timestamp > self._next_timestamp:
self.logger.debug(
f"Received an update for {data} that is past the expected update time {self._next_timestamp}."
)
self._next_timestamp = data.timestamp.ceil(freq=self._update_frequency)
self._pending_updates = self._required_updates.copy()
self.logger.debug(
f"Updated next update time to {self._next_timestamp} and reset pending updates." # nosec
)

if data.timestamp == self._next_timestamp:
self.logger.debug(f"Removing update for {data}.")
self._pending_updates.remove(
CandleSubscription(data.instrument, data.granularity)
)

# Filter out update from pending
if not self._pending_updates:
self.logger.debug(f"Updates for {self} compelete.")
self._updates_complete.set()
self._next_timestamp + timedelta(minutes=self._update_minutes)
self.logger.debug(f"Updated next timestamp to {self._next_timestamp}.")

self.logger.debug(f"Done handling upate for {data}.")

def next(self) -> None:
if self._updates_complete.is_set():
self.logger.debug("Update complete. Running indicators.")
self._updates_complete.clear()
self._next()
self._pending_updates = self._required_updates.copy()
self.logger.debug("Strategy iteration complete. Reset updates.")

def get_data(
self, instrument: Instrument, granularity: Granularity
Expand Down Expand Up @@ -121,17 +147,17 @@ def buy(
tp=None,
sl=None,
) -> None:
self.broker.order(
Order(
instrument,
size,
stop,
limit,
time_in_force=time_in_force,
take_profit_on_fill=tp,
stop_loss_on_fill=sl,
)
order = Order(
instrument,
size,
stop,
limit,
time_in_force=time_in_force,
take_profit_on_fill=tp,
stop_loss_on_fill=sl,
)
self.logger.info(f"Placing order {order}")
self.broker.order(order)

def sell(
self,
Expand Down