diff --git a/Makefile b/Makefile index f64e0ec..2fed741 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,106 @@ +# Makefile for managing Conda environment and Docker Compose services .ONESHELL: -.PHONY: uninstall -.PHONY: install +.PHONY: uninstall install start stop clean rebuild +# Conda environment management +ENV_NAME = quants-lab uninstall: - conda env remove -n quants-lab + @echo "Removing Conda environment: $(ENV_NAME)" + conda env remove -n $(ENV_NAME) install: + @echo "Creating Conda environment from environment.yml" conda env create -f environment.yml + +# Docker Compose services +COMPOSE_DB = docker compose -f docker-compose-db.yml +COMPOSE_TASKS = docker compose -f docker-compose-tasks.yml + +start: + @if [ "$(SERVICE)" ]; then \ + $(COMPOSE_DB) up -d $(SERVICE) || $(COMPOSE_TASKS) up -d $(SERVICE); \ + else \ + echo "Please specify a SERVICE to start. Use 'make start SERVICE='"; \ + fi + +stop: + @if [ "$(SERVICE)" ]; then \ + $(COMPOSE_DB) down $(SERVICE) || $(COMPOSE_TASKS) down $(SERVICE); \ + else \ + echo "Please specify a SERVICE to stop. Use 'make stop SERVICE='"; \ + fi + +# Convenience shortcuts for database services +start-timescaledb: + $(COMPOSE_DB) up -d timescaledb + +stop-timescaledb: + $(COMPOSE_DB) down timescaledb + +start-optunadb: + $(COMPOSE_DB) up -d optunadb + +stop-optunadb: + $(COMPOSE_DB) down optunadb + +# Convenience shortcuts for task runners +start-trades: + $(COMPOSE_TASKS) up -d data-generation-runner + +stop-trades: + $(COMPOSE_TASKS) down data-generation-runner + +start-candles: + $(COMPOSE_TASKS) up -d candles-downloader-runner + +stop-candles: + $(COMPOSE_TASKS) down candles-downloader-runner + +start-report: + $(COMPOSE_TASKS) up -d screeners-report-runner + +stop-report: + $(COMPOSE_TASKS) down screeners-report-runner + +start-backtesting: + $(COMPOSE_TASKS) up -d backtesting-runner + +stop-backtesting: + $(COMPOSE_TASKS) down backtesting-runner + +# Clean and rebuild all containers +clean: + $(COMPOSE_DB) down -v + $(COMPOSE_TASKS) down -v + +rebuild: + $(COMPOSE_DB) down --rmi all -v + $(COMPOSE_TASKS) down --rmi all -v + $(COMPOSE_DB) up --build -d + $(COMPOSE_TASKS) up --build -d + +# Usage help +# Usage help +help: + @echo "Available targets:" + @echo " install - Create Conda environment from environment.yml" + @echo " uninstall - Remove Conda environment" + @echo " start SERVICE= - Start a specific service" + @echo " stop SERVICE= - Stop a specific service" + @echo " start-optunadb - Start optuna database" + @echo " stop-optunadb - Stop optuna database" + @echo " start-timescaledb - Start timescale database" + @echo " stop-timescaledb - Stop timescale database" + @echo " start-db - Start database services (timescaledb, optunadb)" + @echo " stop-db - Stop database services" + @echo " start-trades - Start trades task runner" + @echo " stop-trades - Stop trades task runner" + @echo " start-candles - Start candles downloader" + @echo " stop-candles - Stop candles downloader" + @echo " start-report - Start report generator" + @echo " stop-report - Stop report generator" + @echo " start-backtesting - Start backtesting" + @echo " stop-backtesting - Stop backtesting" + @echo " clean - Remove all containers and volumes" + @echo " rebuild - Rebuild and start all services" \ No newline at end of file diff --git a/core/data_sources/clob.py b/core/data_sources/clob.py index db6094a..6297e64 100644 --- a/core/data_sources/clob.py +++ b/core/data_sources/clob.py @@ -5,6 +5,7 @@ from typing import Dict, List, Optional, Tuple import pandas as pd +from bidict import bidict from hummingbot.client.config.client_config_map import ClientConfigMap from hummingbot.client.config.config_helpers import ClientConfigAdapter, get_connector_class from hummingbot.client.settings import AllConnectorSettings, ConnectorType @@ -75,15 +76,14 @@ def get_candles_from_cache(self, else: return None - - async def get_candles(self, connector_name: str, trading_pair: str, interval: str, start_time: int, end_time: int, - from_trades: bool = False) -> Candles: + from_trades: bool = False, + max_trades_per_call: int = 1_000_000) -> Candles: cache_key = (connector_name, trading_pair, interval) if cache_key in self._candles_cache: @@ -111,9 +111,19 @@ async def get_candles(self, try: logger.info(f"Fetching data for {connector_name} {trading_pair} {interval} from {new_start_time} to {new_end_time}") if from_trades: - trades = await self.get_trades(connector_name, trading_pair, new_start_time, new_end_time) + all_trades = pd.DataFrame() + async for trades in self.yield_trades_chunk(connector_name=connector_name, + trading_pair=trading_pair, + start_time=new_start_time, + end_time=new_end_time, + max_trades_per_call=max_trades_per_call): + if trades.empty: + break + trades["connector_name"] = connector_name + trades["trading_pair"] = trading_pair + all_trades = pd.concat([all_trades, trades]) pandas_interval = self.convert_interval_to_pandas_freq(interval) - candles_df = trades.resample(pandas_interval).agg({"price": "ohlc", "volume": "sum"}).ffill() + candles_df = all_trades.resample(pandas_interval).agg({"price": "ohlc", "volume": "sum"}).ffill() candles_df.columns = candles_df.columns.droplevel(0) candles_df["timestamp"] = pd.to_numeric(candles_df.index) // 1e9 else: @@ -246,10 +256,12 @@ def load_candles_cache(self, root_path: str = ""): except Exception as e: logger.error(f"Error loading {file}: {type(e).__name__} - {e}") - async def get_trades(self, connector_name: str, trading_pair: str, start_time: int, end_time: int, - from_id: Optional[int] = None): - return await self.trades_feeds[connector_name].get_historical_trades(trading_pair, start_time, end_time, - from_id) + async def yield_trades_chunk(self, connector_name: str, trading_pair: str, start_time: int, end_time: int, + from_id: Optional[int] = None, max_trades_per_call: int = 1_000_000): + async for chunk in self.trades_feeds[connector_name].get_historical_trades( + trading_pair, start_time, end_time, from_id, max_trades_per_call + ): + yield chunk @staticmethod def convert_interval_to_pandas_freq(interval: str) -> str: @@ -257,3 +269,24 @@ def convert_interval_to_pandas_freq(interval: str) -> str: Converts a candle interval string to a pandas frequency string. """ return INTERVAL_MAPPING.get(interval, 'T') + + @property + def interval_to_seconds(self): + return bidict({ + "1s": 1, + "1m": 60, + "3m": 180, + "5m": 300, + "15m": 900, + "30m": 1800, + "1h": 3600, + "2h": 7200, + "4h": 14400, + "6h": 21600, + "8h": 28800, + "12h": 43200, + "1d": 86400, + "3d": 259200, + "1w": 604800, + "1M": 2592000 + }) diff --git a/core/data_sources/trades_feed/connectors/binance_perpetual.py b/core/data_sources/trades_feed/connectors/binance_perpetual.py index 225cb16..8d5976d 100644 --- a/core/data_sources/trades_feed/connectors/binance_perpetual.py +++ b/core/data_sources/trades_feed/connectors/binance_perpetual.py @@ -34,7 +34,8 @@ def get_exchange_trading_pair(self, trading_pair: str) -> str: base, quote = trading_pair.split("-") return f"{base}{quote}" - async def _get_historical_trades(self, trading_pair: str, start_time: int, end_time: int, from_id: Optional[int] = None): + async def _get_historical_trades(self, trading_pair: str, start_time: int, end_time: int, + from_id: Optional[int] = None, max_trades_per_call: int = 1_000_000): all_trades_collected = False end_ts = int(end_time * 1000) start_ts = int(start_time * 1000) @@ -58,19 +59,36 @@ async def _get_historical_trades(self, trading_pair: str, start_time: int, end_t if trades: last_timestamp = trades[-1]["T"] all_trades.extend(trades) - all_trades_collected = last_timestamp >= end_ts from_id = trades[-1]["a"] + + # Check if the buffer size is sufficient for yielding + if len(all_trades) >= max_trades_per_call: + df = pd.DataFrame(all_trades) + df.rename(columns={"T": "timestamp", "p": "price", "q": "volume", "m": "sell_taker", "a": "id"}, + inplace=True) + df.drop(columns=["f", "l"], inplace=True) + df["timestamp"] = df["timestamp"] / 1000 + df.index = pd.to_datetime(df["timestamp"], unit="s") + df["price"] = df["price"].astype(float) + df["volume"] = df["volume"].astype(float) + yield df + all_trades = [] # Reset buffer after yielding + + all_trades_collected = last_timestamp >= end_ts else: all_trades_collected = True - df = pd.DataFrame(all_trades) - df.rename(columns={"T": "timestamp", "p": "price", "q": "volume", "m": "sell_taker", "a": "id"}, inplace=True) - df.drop(columns=["f", "l"], inplace=True) - df["timestamp"] = df["timestamp"] / 1000 - df.index = pd.to_datetime(df["timestamp"], unit="s") - df["price"] = df["price"].astype(float) - df["volume"] = df["volume"].astype(float) - return df + # Yield any remaining trades after the loop + if all_trades: + df = pd.DataFrame(all_trades) + df.rename(columns={"T": "timestamp", "p": "price", "q": "volume", "m": "sell_taker", "a": "id"}, + inplace=True) + df.drop(columns=["f", "l"], inplace=True) + df["timestamp"] = df["timestamp"] / 1000 + df.index = pd.to_datetime(df["timestamp"], unit="s") + df["price"] = df["price"].astype(float) + df["volume"] = df["volume"].astype(float) + yield df async def _get_historical_trades_request(self, params: Dict): try: @@ -98,7 +116,6 @@ async def _enforce_rate_limit(self): if current_weight_usage >= self.REQUEST_WEIGHT_LIMIT: # Calculate how long to sleep to stay within the rate limit sleep_time = self.ONE_MINUTE - (current_time - self._request_timestamps[0]) - self.logger().info(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds.") await asyncio.sleep(sleep_time) def _record_request(self): diff --git a/core/data_sources/trades_feed/trades_feed_base.py b/core/data_sources/trades_feed/trades_feed_base.py index 5aff00e..e64bbf0 100644 --- a/core/data_sources/trades_feed/trades_feed_base.py +++ b/core/data_sources/trades_feed/trades_feed_base.py @@ -14,12 +14,15 @@ def get_exchange_trading_pair(self, trading_pair: str) -> str: ... async def get_historical_trades(self, trading_pair: str, start_time: int, end_time: Optional[int] = None, - from_id: Optional[int] = None): + from_id: Optional[int] = None, max_trades_per_call: int = 1_000_000): if not end_time: end_time = int(time.time()) - historical_trades = await self._get_historical_trades(trading_pair, start_time, end_time, from_id) - return historical_trades + + async for chunk in self._get_historical_trades(trading_pair, start_time, end_time, from_id, + max_trades_per_call): + yield chunk # Yield chunks instead of returning a single result @abstractmethod - async def _get_historical_trades(self, trading_pair: str, start_time: int, end_time: int, from_id: Optional[int] = None): + async def _get_historical_trades(self, trading_pair: str, start_time: int, end_time: int, + from_id: Optional[int] = None, max_trades_per_call: int = 1_000_000): ... diff --git a/core/services/timescale_client.py b/core/services/timescale_client.py index dbce370..492b123 100644 --- a/core/services/timescale_client.py +++ b/core/services/timescale_client.py @@ -54,8 +54,8 @@ def get_ohlc_table_name(connector_name: str, trading_pair: str, interval: str) - return f"{connector_name}_{trading_pair.lower().replace('-', '_')}_{interval}" @property - def metrics_table_name(self): - return "summary_metrics" + def trades_summary_table_name(self): + return "trades_summary" @property def screener_table_name(self): @@ -102,7 +102,7 @@ async def create_screener_table(self): async def create_metrics_table(self): async with self.pool.acquire() as conn: await conn.execute(f''' - CREATE TABLE IF NOT EXISTS {self.metrics_table_name} ( + CREATE TABLE IF NOT EXISTS {self.trades_summary_table_name} ( connector_name TEXT NOT NULL, trading_pair TEXT NOT NULL, trade_amount REAL, @@ -316,13 +316,18 @@ async def compute_resampled_ohlc(self, connector_name: str, trading_pair: str, i low NUMERIC NOT NULL, close NUMERIC NOT NULL, volume NUMERIC NOT NULL, + quote_asset_volume NUMERIC NOT NULL, + n_trades INTEGER NOT NULL, + taker_buy_base_volume NUMERIC NOT NULL, + taker_buy_quote_volume NUMERIC NOT NULL, PRIMARY KEY (timestamp) ) ''') # Insert the resampled candles into the new table await conn.executemany(f''' - INSERT INTO {ohlc_table_name} (timestamp, open, high, low, close, volume) - VALUES ($1, $2, $3, $4, $5, $6) + INSERT INTO {ohlc_table_name} (timestamp, open, high, low, close, volume, quote_asset_volume, n_trades, + taker_buy_base_volume, taker_buy_quote_volume) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ''', [ ( datetime.fromtimestamp(row["timestamp"]), @@ -330,7 +335,11 @@ async def compute_resampled_ohlc(self, connector_name: str, trading_pair: str, i row['high'], row['low'], row['close'], - row['volume'] + row['volume'], + 0.0, + 0, + 0.0, + 0.0 ) for i, row in candles.data.iterrows() ]) @@ -378,9 +387,9 @@ async def get_screener_df(self): async def get_db_status_df(self): async with self.pool.acquire() as conn: - rows = await conn.fetch(""" + rows = await conn.fetch(f""" SELECT * - FROM summary_metrics""") + FROM {self.trades_summary_table_name}""") df_cols = [ "connector_name", "trading_pair", @@ -404,11 +413,11 @@ async def append_db_status_metrics(self, connector_name: str, trading_pair: str) metric_data["connector_name"] = connector_name metric_data["trading_pair"] = trading_pair delete_query = f""" - DELETE FROM {self.metrics_table_name} + DELETE FROM {self.trades_summary_table_name} WHERE connector_name = '{metric_data["connector_name"]}' AND trading_pair = '{metric_data["trading_pair"]}'; """ query = f""" - INSERT INTO {self.metrics_table_name} ( + INSERT INTO {self.trades_summary_table_name} ( connector_name, trading_pair, trade_amount, @@ -559,8 +568,8 @@ async def get_data_range(self, connector_name: str, trading_pair: str) -> Dict[s query = f''' SELECT - MIN(timestamp) as start_time, - MAX(timestamp) as end_time + MIN(timestamp) as start_time, + MAX(timestamp) as end_time FROM {table_name} ''' diff --git a/tasks/candles_downloader_runner.py b/tasks/candles_downloader_runner.py index df02374..c0fdbd5 100644 --- a/tasks/candles_downloader_runner.py +++ b/tasks/candles_downloader_runner.py @@ -19,11 +19,11 @@ async def main(): from tasks.data_collection.candles_downloader_task import CandlesDownloaderTask orchestrator = TaskOrchestrator() timescale_config = { - "host": os.getenv("TIMESCALE_HOST", "localhost"), - "port": os.getenv("TIMESCALE_PORT", 5432), - "user": os.getenv("TIMESCALE_USER", "admin"), - "password": os.getenv("TIMESCALE_PASSWORD", "admin"), - "database": os.getenv("TIMESCALE_DB", "timescaledb") + "db_host": os.getenv("TIMESCALE_HOST", "localhost"), + "db_port": os.getenv("TIMESCALE_PORT", 5432), + "db_user": os.getenv("TIMESCALE_USER", "admin"), + "db_password": os.getenv("TIMESCALE_PASSWORD", "admin"), + "db_name": os.getenv("TIMESCALE_DB", "timescaledb") } config = { @@ -32,6 +32,7 @@ async def main(): "intervals": ["1m", "3m", "5m", "15m", "1h"], "days_data_retention": 120, "min_notional_size": 10, + "selected_pairs": None, "timescale_config": timescale_config } @@ -42,4 +43,4 @@ async def main(): if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file diff --git a/tasks/data_collection/candles_downloader_task.py b/tasks/data_collection/candles_downloader_task.py index bbd74f5..45a9efe 100644 --- a/tasks/data_collection/candles_downloader_task.py +++ b/tasks/data_collection/candles_downloader_task.py @@ -25,6 +25,7 @@ def __init__(self, name: str, frequency: timedelta, config: Dict[str, Any]): self.intervals = config.get("intervals", ["1m"]) self.quote_asset = config.get("quote_asset", "USDT") self.min_notional_size = Decimal(str(config.get("min_notional_size", 10.0))) + self.selected_pairs = config.get("selected_pairs") self.clob = CLOBDataSource() async def execute(self): @@ -47,7 +48,11 @@ async def execute(self): await timescale_client.connect() trading_rules = await self.clob.get_trading_rules(self.connector_name) - trading_pairs = trading_rules.get_all_trading_pairs() + trading_pairs = trading_rules.filter_by_quote_asset(self.quote_asset) \ + .filter_by_min_notional_size(self.min_notional_size) \ + .get_all_trading_pairs() + if self.selected_pairs is not None: + trading_pairs = [trading_pair for trading_pair in trading_pairs if trading_pair in self.selected_pairs] for i, trading_pair in enumerate(trading_pairs): for interval in self.intervals: logging.info(f"{now} - Fetching candles for {trading_pair} [{i} from {len(trading_pairs)}]") @@ -63,7 +68,7 @@ async def execute(self): self.connector_name, trading_pair, interval, - int(start_time), + int(start_time) - self.clob.interval_to_seconds[interval] * 10, int(end_time.timestamp()), ) @@ -97,18 +102,19 @@ async def main(config): if __name__ == "__main__": timescale_config = { - "host": os.getenv("TIMESCALE_HOST", "localhost"), - "port": os.getenv("TIMESCALE_PORT", 5432), - "user": os.getenv("TIMESCALE_USER", "admin"), - "password": os.getenv("TIMESCALE_PASSWORD", "admin"), - "database": os.getenv("TIMESCALE_DB", "timescaledb") + "db_host": os.getenv("TIMESCALE_HOST", "localhost"), + "db_port": os.getenv("TIMESCALE_PORT", 5432), + "db_user": os.getenv("TIMESCALE_USER", "admin"), + "db_password": os.getenv("TIMESCALE_PASSWORD", "admin"), + "db_name": os.getenv("TIMESCALE_DB", "timescaledb") } config = { "connector_name": "binance_perpetual", "quote_asset": "USDT", - "intervals": ["15m", "1h"], + "intervals": ["1m", "3m", "5m", "15m", "1h"], "days_data_retention": 30, "min_notional_size": 10, + "selected_pairs": None, "timescale_config": timescale_config } - asyncio.run(main(config)) + asyncio.run(main(config)) \ No newline at end of file diff --git a/tasks/data_collection/trades_downloader_task.py b/tasks/data_collection/trades_downloader_task.py index 5ea5f3a..f053c5c 100644 --- a/tasks/data_collection/trades_downloader_task.py +++ b/tasks/data_collection/trades_downloader_task.py @@ -1,5 +1,6 @@ import asyncio import logging +import os import time from datetime import datetime, timedelta, timezone from decimal import Decimal @@ -23,11 +24,14 @@ def __init__(self, name: str, frequency: timedelta, config: Dict[str, Any]): self.days_data_retention = config.get("days_data_retention", 7) self.start_time = time.time() - self.days_data_retention * 24 * 60 * 60 self.quote_asset = config.get('quote_asset', "USDT") + self.selected_pairs = config.get('selected_pairs') + self.max_trades_per_call = config.get('max_trades_per_call') self.min_notional_size = Decimal(str(config.get('min_notional_size', 10.0))) self.clob = CLOBDataSource() async def execute(self): - logging.info(f"{self.now()} - Starting trades downloader for {self.connector_name} at {time.strftime('%Y-%m-%d %H:%M:%S')}") + logging.info( + f"{self.now()} - Starting trades downloader for {self.connector_name} at {time.strftime('%Y-%m-%d %H:%M:%S')}") end_time = datetime.now(timezone.utc) start_time = pd.Timestamp(self.start_time, unit="s").tz_localize(timezone.utc) logging.info(f"{self.now()} - Start date: {start_time}, End date: {end_time}") @@ -46,45 +50,48 @@ async def execute(self): trading_pairs = trading_rules.filter_by_quote_asset(self.quote_asset) \ .filter_by_min_notional_size(self.min_notional_size) \ .get_all_trading_pairs() + if self.selected_pairs is not None: + trading_pairs = sorted([trading_pair for trading_pair in trading_pairs + if trading_pair in self.selected_pairs]) for i, trading_pair in enumerate(trading_pairs): logging.info(f"{self.now()} - Fetching trades for {trading_pair} [{i} from {len(trading_pairs)}]") try: table_name = timescale_client.get_trades_table_name(self.connector_name, trading_pair) - last_trade_id = await timescale_client.get_last_trade_id(connector_name=self.connector_name, - trading_pair=trading_pair, - table_name=table_name) - trades = await self.clob.get_trades( - self.connector_name, - trading_pair, - int(start_time.timestamp()), - int(end_time.timestamp()), - last_trade_id + last_trade_id = await timescale_client.get_last_trade_id( + connector_name=self.connector_name, + trading_pair=trading_pair, + table_name=table_name ) - if trades.empty: - logging.info(f"{self.now()} - No new trades for {trading_pair}") - continue - - trades["connector_name"] = self.connector_name - trades["trading_pair"] = trading_pair - - trades_data = trades[ - ["id", "connector_name", "trading_pair", "timestamp", "price", "volume", - "sell_taker"]].values.tolist() - - await timescale_client.append_trades(table_name=table_name, - trades=trades_data) + # Process trades in chunks + async for trades in self.clob.yield_trades_chunk(self.connector_name, trading_pair, + int(start_time.timestamp()), int(end_time.timestamp()), + last_trade_id, self.max_trades_per_call): + if trades.empty: + logging.info(f"{self.now()} - No new trades for {trading_pair}") + continue + + trades["connector_name"] = self.connector_name + trades["trading_pair"] = trading_pair + + trades_data = trades[ + ["id", "connector_name", "trading_pair", "timestamp", "price", "volume", + "sell_taker"]].values.tolist() + await timescale_client.append_trades(table_name=table_name, trades=trades_data) + first_timestamp = pd.to_datetime(trades['timestamp'], unit="s").min().strftime('%Y-%m-%d %H:%m') + last_timestamp = pd.to_datetime(trades['timestamp'], unit="s").max().strftime('%Y-%m-%d %H:%m') + logging.info(f"Successfully appended {trading_pair} {len(trades_data)} trades from {first_timestamp} to {last_timestamp}") + + # Cleanup and metrics after all chunks are processed today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) cutoff_timestamp = (today_start - timedelta(days=self.days_data_retention)).timestamp() await timescale_client.delete_trades(connector_name=self.connector_name, trading_pair=trading_pair, timestamp=cutoff_timestamp) - # TODO: isolate resampling and metrics management in another module - # TODO: pass list of intervals to perform better await timescale_client.compute_resampled_ohlc(connector_name=self.connector_name, trading_pair=trading_pair, interval="1s") - - logging.info(f"{self.now()} - Inserted {len(trades_data)} trades for {trading_pair}") - + logging.info(f"{self.now()} - Updated metrics for {trading_pair}") + await timescale_client.append_db_status_metrics(connector_name=self.connector_name, + trading_pair=trading_pair) except Exception as e: logging.exception(f"{self.now()} - An error occurred during the data load for trading pair {trading_pair}:\n {e}") continue @@ -96,15 +103,30 @@ def now(): return datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f UTC') -if __name__ == "__main__": - config = { - 'connector_name': 'binance_perpetual', - 'quote_asset': 'USDT', - 'min_notional_size': 10.0, - 'db_host': 'localhost', - 'db_port': 5432, - 'db_name': 'timescaledb' +async def main(): + timescale_config = { + "host": os.getenv("TIMESCALE_HOST", "localhost"), + "port": os.getenv("TIMESCALE_PORT", 5432), + "user": os.getenv("TIMESCALE_USER", "admin"), + "password": os.getenv("TIMESCALE_PASSWORD", "admin"), + "database": os.getenv("TIMESCALE_DB", "timescaledb") } - task = TradesDownloaderTask("Trades Downloader", timedelta(hours=1), config) - asyncio.run(task.execute()) + trades_downloader_task = TradesDownloaderTask( + name="Trades Downloader Binance", + config={ + "timescale_config": timescale_config, + "connector_name": "binance_perpetual", + "quote_asset": "USDT", + "min_notional_size": 10.0, + "days_data_retention": 10, + "selected_pairs": None, + "max_trades_per_call": 1_000_000 + }, + frequency=timedelta(hours=5)) + + await trades_downloader_task.execute() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/tasks/trades_downloader_runner.py b/tasks/trades_downloader_runner.py index ac23151..4abc5e3 100644 --- a/tasks/trades_downloader_runner.py +++ b/tasks/trades_downloader_runner.py @@ -32,7 +32,9 @@ async def main(): "connector_name": "binance_perpetual", "quote_asset": "USDT", "min_notional_size": 10.0, - "days_data_retention": 10 + "days_data_retention": 10, + "max_trades_per_call": 1_000_000, + "selected_pairs": None }, frequency=timedelta(hours=5)) @@ -41,4 +43,4 @@ async def main(): if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) \ No newline at end of file