Skip to content

Commit

Permalink
Ohhhhhh multithreading you make me want to flip back to mp
Browse files Browse the repository at this point in the history
  • Loading branch information
donewiththedollar committed Apr 17, 2024
1 parent 0e5bd62 commit 7cae8a3
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def run_single_symbol(self, symbol, rotator_symbols_standardized=None):
since_timestamp = int((datetime.now() - timedelta(days=1)).timestamp() * 1000) # 24 hours ago in milliseconds
recent_trades = self.fetch_recent_trades_for_symbol(symbol, since=since_timestamp, limit=20)

logging.info(f"Recent trades for {symbol} : {recent_trades}")
#logging.info(f"Recent trades for {symbol} : {recent_trades}")

# Check if there are any trades in the last 24 hours
recent_activity = any(trade['timestamp'] >= since_timestamp for trade in recent_trades)
Expand Down
62 changes: 9 additions & 53 deletions multi_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,14 @@ def bybit_auto_rotation(args, manager, symbols_allowed):

# Periodically fetch and update latest rotation symbols
if current_time - last_rotator_update_time >= 50:
latest_rotator_symbols = fetch_updated_symbols(args, manager) # This function needs to be implemented to fetch latest rotatable symbols
latest_rotator_symbols = fetch_updated_symbols(args, manager)
last_rotator_update_time = current_time
logging.info(f"Latest rotator symbols: {latest_rotator_symbols}")

with thread_management_lock:
# Update active symbols based on thread status
update_active_symbols()

# Start new threads for open positions not currently active
update_active_threads(open_position_symbols, args, manager, symbols_allowed)

Expand All @@ -418,6 +421,10 @@ def bybit_auto_rotation(args, manager, symbols_allowed):
except Exception as e:
logging.error(f"Exception caught in bybit_auto_rotation: {str(e)}")

def update_active_symbols():
global active_symbols
active_symbols = {symbol for symbol in active_symbols if symbol in threads and threads[symbol].is_alive()}

def update_active_threads(open_position_symbols, args, manager, symbols_allowed):
global active_symbols
for symbol in open_position_symbols:
Expand Down Expand Up @@ -447,21 +454,6 @@ def manage_excess_threads(symbols_allowed):
remove_thread_for_symbol(symbol_to_remove)
logging.info(f"Removed excess thread for symbol: {symbol_to_remove}")


def manage_and_rotate_threads(args, manager, symbols_allowed, open_position_symbols, latest_rotator_symbols):
active_symbols = manage_threads(args, manager, open_position_symbols) # This updates global active_symbols
remaining_slots = symbols_allowed - len(active_symbols)

# Start threads for additional symbols if there are slots available
for symbol in open_position_symbols.union(latest_rotator_symbols):
if symbol not in active_symbols and remaining_slots > 0:
if symbol not in threads or not threads[symbol].is_alive():
start_thread_for_symbol(symbol, args, manager, symbols_allowed)
active_symbols.add(symbol)
remaining_slots -= 1 # Decrease the slot count
else:
logging.info(f"Thread for symbol {symbol} is already active. Skipping.")

def remove_thread_for_symbol(symbol):
"""Safely removes a thread associated with a symbol."""
thread = threads.get(symbol)
Expand All @@ -482,7 +474,6 @@ def start_thread_for_symbol(symbol, args, manager):
logging.error(f"Error starting thread for symbol {symbol}: {e}")
return False # Failed to start thread


def fetch_updated_symbols(args, manager):
"""Fetches and logs potential symbols based on the current trading strategy."""
strategy = args.strategy.lower()
Expand Down Expand Up @@ -522,42 +513,7 @@ def log_symbol_details(strategy, symbols):
logging.info(f"Potential symbols for {strategy}: {symbols}")
else:
logging.info(f"Other strategy symbols: {symbols}")

def manage_threads(args, manager, symbols_allowed, open_position_symbols):
"""Ensure that every open position symbol has an active thread and handle thread lifecycle."""
global active_symbols # Use the global active_symbols variable
active_symbols.clear() # Clear the active_symbols set before updating it
for symbol in open_position_symbols:
if symbol not in threads or not threads[symbol].is_alive():
logging.warning(f"No active thread for open position symbol: {symbol}. Starting a new thread.")
start_thread_for_symbol(symbol, args, manager, symbols_allowed)
active_symbols.add(symbol)
else:
active_symbols.add(symbol) # Add to active if already running
return active_symbols

def rotate_and_refresh_threads(args, manager, symbols_allowed, active_symbols, open_position_symbols):
"""Rotate out inactive threads and refresh the list by adding new threads from latest symbols."""
# Start threads for additional symbols from the latest symbol set
for symbol in latest_rotator_symbols:
if symbol not in active_symbols and len(active_symbols) < symbols_allowed:
start_thread_for_symbol(symbol, args, manager, symbols_allowed)
active_symbols.add(symbol)

# Remove threads that are no longer active or relevant
for symbol in list(active_symbols):
if symbol not in open_position_symbols and (symbol in threads and not threads[symbol].is_alive()):
remove_inactive_thread(symbol)
active_symbols.remove(symbol) # Clean up symbol from active list if thread is not alive and symbol is not in open positions

def remove_inactive_thread(symbol):
"""Remove a thread that is no longer active."""
if symbol in threads:
logging.info(f"Removing inactive thread for symbol: {symbol}")
threads[symbol].join()
del threads[symbol]



def hyperliquid_auto_rotation(args, manager, symbols_allowed):
# Fetching open position symbols and standardizing them
open_position_symbols = {standardize_symbol(pos['symbol']) for pos in market_maker.exchange.get_all_open_positions_hyperliquid()}
Expand Down

0 comments on commit 7cae8a3

Please sign in to comment.