diff --git a/.dockerignore b/.dockerignore index 2764e14..5e6f39f 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,4 +5,5 @@ docker *.code-workspace .dockerignore -htmlcov \ No newline at end of file +htmlcov +.env \ No newline at end of file diff --git a/.env b/.env new file mode 100644 index 0000000..7747204 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +LOG_LEVEL="NOTSET" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 0a77a8d..47e20bc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .coverage .venv -**/__pycache__/* \ No newline at end of file +**/__pycache__/* +**/*.log \ No newline at end of file diff --git a/pytrade/broker.py b/pytrade/broker.py index 36b3dd2..1bb11ba 100644 --- a/pytrade/broker.py +++ b/pytrade/broker.py @@ -6,6 +6,7 @@ from pytrade.interfaces.client import IClient from pytrade.interfaces.data import IInstrumentData from pytrade.interfaces.position import IPosition +from pytrade.logging import get_logger from pytrade.models import Order @@ -16,6 +17,7 @@ def __init__(self, client: IClient): self._orders: List[Order] = [] self._data_context = CandleData(max_size=100) self._subscriptions: list[Tuple[Instrument, Granularity]] = [] + self.logger = get_logger() @property def equity(self) -> float: @@ -39,15 +41,18 @@ def order(self, order: Order): self._orders.append(order) def process_orders(self): + self.logger.debug(f"Processing {len(self._orders)} orders.") for order in self._orders: self.client.order(order) self._orders.clear() + self.logger.debug("Orders cleared.") def load_instrument_candles( self, instrument: Instrument, granularity: Granularity, count: int ): key = (instrument, granularity) + self.logger.debug(f"Loading candles for {key}") if key in self._subscriptions: raise RuntimeError( @@ -59,6 +64,7 @@ def load_instrument_candles( if len(instrument_data.df) < count: instrument_data.clear() candles = self.client.get_candles(instrument, granularity, count) + self.logger.debug(f"Loading candles for {instrument_data}.") for candle in candles: instrument_data.update(candle) @@ -67,6 +73,7 @@ def subscribe( ) -> IInstrumentData: key = (instrument, granularity) + self.logger.debug(f"Subscribing to candles for {key}") instrument_data = self._data_context.get(instrument, granularity) diff --git a/pytrade/data.py b/pytrade/data.py index 58a2638..c0777da 100644 --- a/pytrade/data.py +++ b/pytrade/data.py @@ -1,5 +1,5 @@ import asyncio -from datetime import timedelta +from datetime import datetime, timedelta, timezone from typing import Optional import pandas as pd @@ -8,6 +8,7 @@ from pytrade.events.event import Event from pytrade.instruments import Candlestick, Granularity, Instrument from pytrade.interfaces.data import IDataContext, IInstrumentData +from pytrade.logging import get_logger COLUMNS = ["datetime", "instrument", "open", "high", "low", "close"] @@ -33,6 +34,13 @@ def __init__( self.__instrument: Optional[Instrument] = None self.__granularity: Optional[Granularity] = None self.__update_event = Event() + self.logger = get_logger() + + def __str__(self): + return ( + f"" + ) @property def df(self): @@ -48,7 +56,11 @@ def granularity(self): @property def timestamp(self) -> Timestamp: - return self._data.index[-1] + return ( + self._data.index[-1] + if len(self._data) > 0 + else pd.Timestamp(datetime.min, tzinfo=timezone.utc) + ) @property def on_update(self): @@ -76,13 +88,18 @@ def update(self, candlestick: Candlestick): f"Received {candlestick.granularity} for history[{self.__granularity}]" ) + if len(self._data) > 0 and self._data.index[-1] == candlestick.timestamp: + self.logger.warning( + f"Received an duplicate update for {candlestick.instrument}[{candlestick.timestamp}]" + ) + if self._max_size and len(self._data) >= self._max_size: _delta = self._data.index[-1] - self._data.index[-2] if not isinstance(_delta, timedelta): raise RuntimeError( "Expected dataframe to have DatetimeInex. Unable to caluclate timedelta from index." ) - if self._data.index[-1] >= candlestick.timestamp: + if self._data.index[-1] > candlestick.timestamp: raise RuntimeError( f"Received a candle update for on outdated timestamp. Update \ time {candlestick.timestamp} but dataframe is at {self._data.index[-1]}" @@ -110,6 +127,10 @@ def __init__(self, max_size=1000): self._data: dict[tuple[Instrument, Granularity], InstrumentCandles] = {} self._max_size = max_size self._update_event = asyncio.Event() + self.logger = get_logger() + + def __str__(self): + return f"" @property def universe(self) -> list[IInstrumentData]: diff --git a/pytrade/indicator.py b/pytrade/indicator.py index bdde78b..ad90b92 100644 --- a/pytrade/indicator.py +++ b/pytrade/indicator.py @@ -12,6 +12,9 @@ def __init__(self, data: IInstrumentData): data.on_update += self._update self._values = self._run() + def __str__(self): + return f"<{self.__class__.__name__} value={self.value}>" + def _update(self): self._values = self._run() diff --git a/pytrade/instruments.py b/pytrade/instruments.py index ada99ed..581983a 100644 --- a/pytrade/instruments.py +++ b/pytrade/instruments.py @@ -111,6 +111,9 @@ def __instrument_eq__(self, other: "CandleSubscription"): return result + def __str__(self): + return f"<{self.__class__.__name__} {self.instrument}[{self.granularity}]>" + class Candlestick: @@ -132,6 +135,12 @@ def __init__( self.close = close self.timestamp = timestamp + def __str__(self): + return ( + f"<{self.__class__.__name__} instrument={self.instrument} granularity={self.granularity} " + f"O: {self.open} H: {self.high} L: {self.low} C:{self.close} Time: {self.timestamp}>" + ) + def to_dict(self): return { "datetime": self.timestamp, @@ -158,3 +167,6 @@ def __init__(self, instrument: str, timestamp: str, bid: str, ask: str): ) self.bid = float(bid) self.ask = float(ask) + + def __str__(self): + return f"<{self.__class__.__name__} {self.instrument} bid={self.bid:.5f} ask={self.ask:.5f}>" diff --git a/pytrade/logging.py b/pytrade/logging.py index e69de29..686ad94 100644 --- a/pytrade/logging.py +++ b/pytrade/logging.py @@ -0,0 +1,37 @@ +import inspect +import logging +import os + +format = "%(asctime)s - %(levelname)s - %(message)s" +formatter = logging.Formatter(format) +log_level = getattr(logging, os.environ.get("LOG_LEVEL", "INFO").upper()) + +stream_handler = logging.StreamHandler() +stream_handler.setLevel(log_level) +stream_handler.setFormatter(formatter) + +# Create a file handler to write logs to a file +file_handler = logging.FileHandler("app.log") +file_handler.setLevel(log_level) +file_handler.setFormatter(formatter) + + +def get_logger(): + stack = inspect.stack() + parentframe = stack[1][0] + + logger_name = "DEFAULT" + + if "self" in parentframe.f_locals: + logger_name = parentframe.f_locals["self"].__class__.__name__ + else: + module_info = inspect.getmodule(parentframe) + if module_info: + logger_name = module_info.__name__ + + del parentframe + _logger = logging.getLogger(logger_name) + _logger.setLevel(log_level) + _logger.addHandler(stream_handler) + _logger.addHandler(file_handler) + return _logger diff --git a/pytrade/models.py b/pytrade/models.py index 3ce9ebb..5502f13 100644 --- a/pytrade/models.py +++ b/pytrade/models.py @@ -44,6 +44,15 @@ def __init__( self._trailing_stop_loss_on_fill: Optional[float] = trailing_stop_loss_on_fill self.__parent_trade = parent_trade + def __str__(self): + + return ( + f"" + ) + def __eq__(self, other: Any): return other is self @@ -134,7 +143,7 @@ def __init__( self.__tp_order: Optional[Order] = None self.__tag: Optional[str] = tag - def __repr__(self): # pragma: no cover + def __str__(self): # pragma: no cover return ( f' None: + self.logger.info("Initializing strategy.") self._caluclate_updates() # Call init first incase any indicators preload data for initial signals self._init() self._monitor_instruments() def _caluclate_updates(self) -> None: + self.logger.debug("Calculating update intervals.") self._required_updates: list[CandleSubscription] = [] max_interval = 0 _max_granularity = None @@ -51,6 +55,11 @@ def _caluclate_updates(self) -> None: # Just set to min and let the first update set it correctly self._next_timestamp = Timestamp.min.replace(tzinfo=timezone.utc) + self.logger.debug( + f"Set updates to [{str.join(" ", (str(update) for update in self._required_updates))}] \ +and next update time to {self._next_timestamp}" + ) + @property @abstractmethod def subscriptions(self) -> list[CandleSubscription]: @@ -61,36 +70,53 @@ def subscriptions(self) -> list[CandleSubscription]: raise NotImplementedError() def _monitor_instruments(self) -> None: + self.logger.debug("Subscribing to instruments.") for subscription in self.subscriptions: instrument_data = self.broker.subscribe( subscription.instrument, subscription.granularity ) instrument_data.on_update += self._update_callback(instrument_data) + self.logger.debug(f"Subscribed to {subscription}.") + self.logger.debug("Subscriptions complete") def _update_callback(self, data: IInstrumentData): return lambda: self._handle_update(data) def _handle_update(self, data: IInstrumentData) -> None: + self.logger.debug(f"Received upate for {data}") # If data was missed move to the next update window based on data if data.timestamp > self._next_timestamp: + self.logger.debug( + f"Received an update for {data} that is past the expected update time {self._next_timestamp}." + ) self._next_timestamp = data.timestamp.ceil(freq=self._update_frequency) self._pending_updates = self._required_updates.copy() + self.logger.debug( + f"Updated next update time to {self._next_timestamp} and reset pending updates." # nosec + ) if data.timestamp == self._next_timestamp: + self.logger.debug(f"Removing update for {data}.") self._pending_updates.remove( CandleSubscription(data.instrument, data.granularity) ) # Filter out update from pending if not self._pending_updates: + self.logger.debug(f"Updates for {self} compelete.") self._updates_complete.set() self._next_timestamp + timedelta(minutes=self._update_minutes) + self.logger.debug(f"Updated next timestamp to {self._next_timestamp}.") + + self.logger.debug(f"Done handling upate for {data}.") def next(self) -> None: if self._updates_complete.is_set(): + self.logger.debug("Update complete. Running indicators.") self._updates_complete.clear() self._next() self._pending_updates = self._required_updates.copy() + self.logger.debug("Strategy iteration complete. Reset updates.") def get_data( self, instrument: Instrument, granularity: Granularity @@ -121,17 +147,17 @@ def buy( tp=None, sl=None, ) -> None: - self.broker.order( - Order( - instrument, - size, - stop, - limit, - time_in_force=time_in_force, - take_profit_on_fill=tp, - stop_loss_on_fill=sl, - ) + order = Order( + instrument, + size, + stop, + limit, + time_in_force=time_in_force, + take_profit_on_fill=tp, + stop_loss_on_fill=sl, ) + self.logger.info(f"Placing order {order}") + self.broker.order(order) def sell( self,