Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonilla-tao committed Jan 31, 2025
1 parent 4a725ba commit c05d5de
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 61 deletions.
Empty file added neurons/__init__.py
Empty file.
253 changes: 253 additions & 0 deletions neurons/backtest_manager.py

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion shared_objects/cache_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ class CacheController:
MAX_DAILY_DRAWDOWN = 'MAX_DAILY_DRAWDOWN'
MAX_TOTAL_DRAWDOWN = 'MAX_TOTAL_DRAWDOWN'

def __init__(self, metagraph=None, running_unit_tests=False):
def __init__(self, metagraph=None, running_unit_tests=False, is_backtesting=False):
self.running_unit_tests = running_unit_tests
self.init_cache_files()
self.metagraph = metagraph # Refreshes happen on validator
self.is_backtesting = is_backtesting
self._last_update_time_ms = 0
self.DD_V2_TIME = TimeUtil.millis_to_datetime(1715359820000 + 1000 * 60 * 60 * 2) # 5/10/24 TODO: Update before mainnet release

Expand Down Expand Up @@ -62,6 +63,8 @@ def generate_elimination_row(hotkey, dd, reason, t_ms=None, price_info=None, ret

def refresh_allowed(self, refresh_interval_ms):
self.attempted_start_time_ms = TimeUtil.now_in_millis()
if self.is_backtesting:
return True
return self.attempted_start_time_ms - self.get_last_update_time_ms() > refresh_interval_ms

def init_cache_files(self) -> None:
Expand Down
15 changes: 11 additions & 4 deletions time_util/time_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# developer: Taoshidev
# Copyright © 2024 Taoshi Inc
import functools
import re
import time
from datetime import datetime, timedelta, timezone
Expand Down Expand Up @@ -195,14 +196,20 @@ def my_function():
pass
"""
def timeme(func):
def wrapper(*args, **kwargs):
@functools.wraps(func)
def wrapper(self, *args, **kwargs): # Explicitly declare self
if isinstance(self, object) and hasattr(self, "is_backtesting") and self.is_backtesting:
#print(f"Skipping timing for {func.__name__} because is_backtesting is True")
return func(self, *args, **kwargs) # Call function without timing

# Time the function execution
start = time.time()
result = func(*args, **kwargs)
result = func(self, *args, **kwargs)
end = time.time()
print(f"{func.__name__} took {end - start} s to run")
print(f"{func.__name__} took {end - start:.6f} s to run")
return result

return wrapper
return functools.update_wrapper(wrapper, func)

class UnifiedMarketCalendar:
def __init__(self):
Expand Down
25 changes: 16 additions & 9 deletions vali_objects/utils/challengeperiod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,31 @@

class ChallengePeriodManager(CacheController):
def __init__(self, metagraph, perf_ledger_manager : PerfLedgerManager =None, running_unit_tests=False,
position_manager: PositionManager =None, ipc_manager=None):
super().__init__(metagraph, running_unit_tests=running_unit_tests)
position_manager: PositionManager =None, ipc_manager=None, is_backtesting=False):
super().__init__(metagraph, running_unit_tests=running_unit_tests, is_backtesting=is_backtesting)
self.perf_ledger_manager = perf_ledger_manager if perf_ledger_manager else \
PerfLedgerManager(metagraph, running_unit_tests=running_unit_tests)
self.position_manager = position_manager
self.elimination_manager = self.position_manager.elimination_manager
disk_challenegeperiod_testing = self.get_challengeperiod_testing(from_disk=True)
disk_challenegeperiod_success = self.get_challengeperiod_success(from_disk=True)

if self.is_backtesting:
initial_challenegeperiod_testing = {}
initial_challenegeperiod_success = {}
else:
initial_challenegeperiod_testing = self.get_challengeperiod_testing(from_disk=True)
initial_challenegeperiod_success = self.get_challengeperiod_success(from_disk=True)
self.using_ipc = bool(ipc_manager)
if ipc_manager:
self.challengeperiod_testing = ipc_manager.dict()
self.challengeperiod_success = ipc_manager.dict()
for k, v in disk_challenegeperiod_testing.items():
for k, v in initial_challenegeperiod_testing.items():
self.challengeperiod_testing[k] = v
for k, v in disk_challenegeperiod_success.items():
for k, v in initial_challenegeperiod_success.items():
self.challengeperiod_success[k] = v
else:
self.challengeperiod_testing = disk_challenegeperiod_testing
self.challengeperiod_success = disk_challenegeperiod_success
if len(self.get_challengeperiod_testing()) == 0 and len(self.get_challengeperiod_success()) == 0:
self.challengeperiod_testing = initial_challenegeperiod_testing
self.challengeperiod_success = initial_challenegeperiod_success
if not self.is_backtesting and len(self.get_challengeperiod_testing()) == 0 and len(self.get_challengeperiod_success()) == 0:
ValiBkpUtils.write_file(
ValiBkpUtils.get_challengeperiod_file_location(running_unit_tests=self.running_unit_tests),
{"testing": {}, "success": {}}
Expand Down Expand Up @@ -445,6 +450,8 @@ def _demote_challengeperiod_in_memory(self, hotkeys: list[str]):
self.elimination_manager.append_elimination_row(hotkey, -1, 'FAILED_CHALLENGE_PERIOD')

def _write_challengeperiod_from_memory_to_disk(self):
if self.is_backtesting:
return
challengeperiod_data = {
"testing": self.get_challengeperiod_testing(),
"success": self.get_challengeperiod_success()
Expand Down
7 changes: 4 additions & 3 deletions vali_objects/utils/elimination_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class EliminationManager(CacheController):
"""

def __init__(self, metagraph, position_manager, challengeperiod_manager,
running_unit_tests=False, shutdown_dict=None, ipc_manager=None):
super().__init__(metagraph=metagraph)
running_unit_tests=False, shutdown_dict=None, ipc_manager=None, is_backtesting=False):
super().__init__(metagraph=metagraph, is_backtesting=is_backtesting)
self.position_manager = position_manager
self.shutdown_dict = shutdown_dict
self.challengeperiod_manager = challengeperiod_manager
Expand Down Expand Up @@ -183,7 +183,8 @@ def _delete_eliminated_expired_miners(self):
bt.logging.info(f"Zombie miner dir not found. Already deleted. [{miner_dir}]")

def save_eliminations(self):
self.write_eliminations_to_disk(self.eliminations)
if not self.is_backtesting:
self.write_eliminations_to_disk(self.eliminations)

def write_eliminations_to_disk(self, eliminations):
if not isinstance(eliminations, list):
Expand Down
46 changes: 26 additions & 20 deletions vali_objects/utils/position_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ def __init__(self, metagraph=None, running_unit_tests=False,
elimination_manager=None,
secrets=None,
ipc_manager=None,
live_price_fetcher=None):
live_price_fetcher=None,
is_backtesting=False):

super().__init__(metagraph=metagraph, running_unit_tests=running_unit_tests)
super().__init__(metagraph=metagraph, running_unit_tests=running_unit_tests, is_backtesting=is_backtesting)
# Populate memory with positions

self.perf_ledger_manager = perf_ledger_manager
Expand All @@ -55,13 +56,16 @@ def __init__(self, metagraph=None, running_unit_tests=False,
else:
self.hotkey_to_positions = {}
self.secrets = secrets
self.populate_memory_positions_for_first_time()
self._populate_memory_positions_for_first_time()
self.live_price_fetcher = live_price_fetcher

@timeme
def populate_memory_positions_for_first_time(self):
temp = self.get_positions_for_all_miners(from_disk=True)
for hk, positions in temp.items():
def _populate_memory_positions_for_first_time(self):
if self.is_backtesting:
return

initial_hk_to_positions = self.get_positions_for_all_miners(from_disk=True)
for hk, positions in initial_hk_to_positions.items():
if positions: # Only populate if there are no positions in the miner dir
self.hotkey_to_positions[hk] = positions

Expand Down Expand Up @@ -919,17 +923,18 @@ def _save_miner_position_to_memory(self, position: Position):


def save_miner_position(self, position: Position, delete_open_position_if_exists=True) -> None:
miner_dir = ValiBkpUtils.get_partitioned_miner_positions_dir(position.miner_hotkey,
position.trade_pair.trade_pair_id,
order_status=OrderStatus.OPEN if position.is_open_position else OrderStatus.CLOSED,
running_unit_tests=self.running_unit_tests)
if position.is_closed_position and delete_open_position_if_exists:
self.delete_open_position_if_exists(position)
elif position.is_open_position:
self.verify_open_position_write(miner_dir, position)

#print(f'Saving position {position.position_uuid} for miner {position.miner_hotkey} and trade pair {position.trade_pair.trade_pair_id} is_open {position.is_open_position}')
ValiBkpUtils.write_file(miner_dir + position.position_uuid, position)
if not self.is_backtesting:
miner_dir = ValiBkpUtils.get_partitioned_miner_positions_dir(position.miner_hotkey,
position.trade_pair.trade_pair_id,
order_status=OrderStatus.OPEN if position.is_open_position else OrderStatus.CLOSED,
running_unit_tests=self.running_unit_tests)
if position.is_closed_position and delete_open_position_if_exists:
self.delete_open_position_if_exists(position)
elif position.is_open_position:
self.verify_open_position_write(miner_dir, position)

#print(f'Saving position {position.position_uuid} for miner {position.miner_hotkey} and trade pair {position.trade_pair.trade_pair_id} is_open {position.is_open_position}')
ValiBkpUtils.write_file(miner_dir + position.position_uuid, position)
self._save_miner_position_to_memory(position)

def overwrite_position_on_disk(self, position: Position) -> None:
Expand Down Expand Up @@ -1009,9 +1014,10 @@ def delete_position(self, p: Position, check_open_and_closed_dirs=False):
else:
file_paths = [self.get_filepath_for_position(hotkey, trade_pair_id, position_uuid, is_open)]
for fp in file_paths:
if os.path.exists(fp):
os.remove(fp)
bt.logging.info(f"Deleted position from disk: {fp}")
if not self.is_backtesting:
if os.path.exists(fp):
os.remove(fp)
bt.logging.info(f"Deleted position from disk: {fp}")
self._delete_position_from_memory(hotkey, position_uuid)

def _delete_position_from_memory(self, hotkey, position_uuid):
Expand Down
45 changes: 27 additions & 18 deletions vali_objects/utils/subtensor_weight_setter.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
# developer: jbonilla
from functools import partial

import bittensor as bt

from time_util.time_util import TimeUtil
from time_util.time_util import TimeUtil, timeme
from vali_objects.vali_config import ValiConfig
from shared_objects.cache_controller import CacheController
from vali_objects.utils.position_manager import PositionManager
from vali_objects.scoring.scoring import Scoring

class SubtensorWeightSetter(CacheController):
def __init__(self, metagraph, position_manager: PositionManager,
running_unit_tests=False):
super().__init__(metagraph, running_unit_tests=running_unit_tests)
running_unit_tests=False, is_backtesting=False):
super().__init__(metagraph, running_unit_tests=running_unit_tests, is_backtesting=is_backtesting)
self.position_manager = position_manager
self.perf_ledger_manager = position_manager.perf_ledger_manager
self.subnet_version = 200
# Store weights for use in backtesting
self.checkpoint_results = []
self.transformed_list = []

def compute_weights_default(self, current_time: int, metagraph_hotkeys: list[int], ) -> tuple[list[tuple[str, float]], list[tuple[str, float]]]:
def compute_weights_default(self, current_time: int) -> tuple[list[tuple[str, float]], list[tuple[str, float]]]:
if current_time is None:
current_time = TimeUtil.now_in_millis()

testing_hotkeys = list(self.position_manager.challengeperiod_manager.challengeperiod_testing.keys())
success_hotkeys = list(self.position_manager.challengeperiod_manager.challengeperiod_success.keys())

if self.is_backtesting:
hotkeys_to_compute_weights_for = testing_hotkeys + success_hotkeys
else:
hotkeys_to_compute_weights_for = success_hotkeys
# only collect ledger elements for the miners that passed the challenge period
filtered_ledger = self.perf_ledger_manager.filtered_ledger_for_scoring(hotkeys=success_hotkeys)
filtered_positions, _ = self.position_manager.filtered_positions_for_scoring(hotkeys=success_hotkeys)
filtered_ledger = self.perf_ledger_manager.filtered_ledger_for_scoring(hotkeys=hotkeys_to_compute_weights_for)
filtered_positions, _ = self.position_manager.filtered_positions_for_scoring(hotkeys=hotkeys_to_compute_weights_for)


if len(filtered_ledger) == 0:
Expand All @@ -45,19 +51,19 @@ def compute_weights_default(self, current_time: int, metagraph_hotkeys: list[int

checkpoint_netuid_weights = []
for miner, score in checkpoint_results:
if miner in metagraph_hotkeys:
if miner in self.metagraph.hotkeys:
checkpoint_netuid_weights.append((
metagraph_hotkeys.index(miner),
self.metagraph.hotkeys.index(miner),
score
))
else:
bt.logging.error(f"Miner {miner} not found in the metagraph.")

challengeperiod_weights = []
for miner in testing_hotkeys:
if miner in metagraph_hotkeys:
if miner in self.metagraph.hotkeys:
challengeperiod_weights.append((
metagraph_hotkeys.index(miner),
self.metagraph.hotkeys.index(miner),
ValiConfig.CHALLENGE_PERIOD_WEIGHT
))
else:
Expand All @@ -70,21 +76,24 @@ def _store_weights(self, checkpoint_results: list[tuple[str, float]], transforme
self.checkpoint_results = checkpoint_results
self.transformed_list = transformed_list

@timeme
def set_weights(self, wallet, netuid, subtensor, current_time: int = None, scoring_function: callable = None, scoring_func_args: dict = None):
if not self.refresh_allowed(ValiConfig.SET_WEIGHT_REFRESH_TIME_MS):
return
bt.logging.info("running set weights")
if scoring_func_args is None:
scoring_func_args = {'current_time': current_time}

if scoring_function is None:
scoring_function = self.compute_weights_default
scoring_func_args = {'current_time': current_time,
'metagraph': self.metagraph.hotkeys,
}
else:
assert scoring_func_args is not None, "scoring_func_args must be provided if scoring_function is not None."
scoring_function = self.compute_weights_default # Uses instance method
elif not hasattr(scoring_function, '__self__'):
scoring_function = partial(scoring_function, self) # Only bind if external

checkpoint_results, transformed_list = scoring_function(**scoring_func_args)
self._store_weights(checkpoint_results, transformed_list)
self._set_subtensor_weights(wallet, subtensor, netuid)
self.checkpoint_results = checkpoint_results
self.transformed_list = transformed_list
if not self.is_backtesting:
self._set_subtensor_weights(wallet, subtensor, netuid)
self.set_last_update_time()


Expand Down
16 changes: 10 additions & 6 deletions vali_objects/vali_dataclasses/perf_ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ def get_total_ledger_duration_ms(self):
class PerfLedgerManager(CacheController):
def __init__(self, metagraph, ipc_manager=None, running_unit_tests=False, shutdown_dict=None,
perf_ledger_hks_to_invalidate=None, live_price_fetcher=None, position_manager=None,
enable_rss=True):
super().__init__(metagraph=metagraph, running_unit_tests=running_unit_tests)
enable_rss=True, is_backtesting=False):
super().__init__(metagraph=metagraph, running_unit_tests=running_unit_tests, is_backtesting=is_backtesting)
self.shutdown_dict = shutdown_dict
self.live_price_fetcher = live_price_fetcher
self.running_unit_tests = running_unit_tests
Expand All @@ -357,7 +357,7 @@ def __init__(self, metagraph, ipc_manager=None, running_unit_tests=False, shutdo
self.hotkey_to_perf_bundle = {}
self.running_unit_tests = running_unit_tests
self.position_manager = position_manager
self.pds = None # Not pickable. Load it later once the process starts
self.pds = live_price_fetcher.polygon_data_service if live_price_fetcher else None # Load it later once the process starts so ipc works.
self.live_price_fetcher = live_price_fetcher # For unit tests only

# Every update, pick a hotkey to rebuild in case polygon 1s candle data changed.
Expand All @@ -383,7 +383,8 @@ def __init__(self, metagraph, ipc_manager=None, running_unit_tests=False, shutdo
self.mode_to_n_updates = {}
self.update_to_n_open_positions = {}
self.position_uuid_to_cache = defaultdict(FeeCache)
for k, v in self.get_perf_ledgers(from_disk=True, portfolio_only=False).items():
initial_perf_ledgers = {} if self.is_backtesting else self.get_perf_ledgers(from_disk=True, portfolio_only=False)
for k, v in initial_perf_ledgers.items():
self.hotkey_to_perf_bundle[k] = v

def _is_v1_perf_ledger(self, ledger_value):
Expand Down Expand Up @@ -1166,7 +1167,8 @@ def update_all_perf_ledgers(self, hotkey_to_positions: dict[str, List[Position]]
n_perf_ledgers = len(existing_perf_ledgers) if existing_perf_ledgers else 0
n_hotkeys_with_positions = len(hotkey_to_positions) if hotkey_to_positions else 0
bt.logging.success(f"Done updating perf ledger for all hotkeys in {time.time() - t_init} s. n_perf_ledgers {n_perf_ledgers}. n_hotkeys_with_positions {n_hotkeys_with_positions}")
self.write_perf_ledger_eliminations_to_disk(self.candidate_pl_elimination_rows)
if not self.is_backtesting:
self.write_perf_ledger_eliminations_to_disk(self.candidate_pl_elimination_rows)
# clear and populate proxy list in a multiprocessing-friendly way
del self.pl_elimination_rows[:]
self.pl_elimination_rows.extend(self.candidate_pl_elimination_rows)
Expand Down Expand Up @@ -1210,6 +1212,7 @@ def generate_perf_ledgers_for_analysis(self, hotkey_to_positions: dict[str, List
existing_perf_ledgers = {}
return self.update_all_perf_ledgers(hotkey_to_positions, existing_perf_ledgers, t_ms)

@timeme
def update(self, testing_one_hotkey=None, regenerate_all_ledgers=False, t_ms=None):
assert self.position_manager.elimination_manager.metagraph, "Metagraph must be loaded before updating perf ledgers"
assert self.metagraph, "Metagraph must be loaded before updating perf ledgers"
Expand Down Expand Up @@ -1389,7 +1392,8 @@ def save_perf_ledgers_to_disk(self, perf_ledgers: dict[str, dict[str, PerfLedger

@timeme
def save_perf_ledgers(self, perf_ledgers_copy: dict[str, dict[str, PerfLedger]] | dict[str, dict[str, dict]], raw_json=False):
self.save_perf_ledgers_to_disk(perf_ledgers_copy, raw_json=raw_json)
if not self.is_backtesting:
self.save_perf_ledgers_to_disk(perf_ledgers_copy, raw_json=raw_json)

# Update memory
for k in list(self.hotkey_to_perf_bundle.keys()):
Expand Down

0 comments on commit c05d5de

Please sign in to comment.