diff --git a/multi_bot.py b/multi_bot.py index e7117a34..7aae4617 100644 --- a/multi_bot.py +++ b/multi_bot.py @@ -281,14 +281,12 @@ def get_symbols(self): orders_canceled = False -def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbols_standardized): +def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbols_standardized, thread_completed): global orders_canceled current_thread = threading.current_thread() - try: with thread_to_symbol_lock: thread_to_symbol[current_thread] = symbol - time.sleep(1) # Correct the path for the configuration file @@ -296,6 +294,7 @@ def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbol config_file_path = Path('configs/' + args.config) else: config_file_path = Path(args.config) + print("Loading config from:", config_file_path) config = load_config(config_file_path) @@ -310,12 +309,12 @@ def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbol print(f"Trading symbol: {symbol}") print(f"Exchange name: {exchange_name}") print(f"Strategy name: {strategy_name}") - print(f"Account name: {account_name}") + print(f"Account name: {account_name}") # Pass account_name to DirectionalMarketMaker constructor market_maker = DirectionalMarketMaker(config, exchange_name, account_name) market_maker.manager = manager - + try: # Cancel all open orders at the startup of the first thread only if not orders_canceled and hasattr(market_maker.exchange, 'cancel_all_open_orders_bybit'): @@ -323,8 +322,8 @@ def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbol logging.info(f"Cleared all open orders on the exchange upon initialization.") orders_canceled = True # Set the flag to True to prevent future cancellations except Exception as e: - logging.info(f"Excetion caught {e}") - + logging.info(f"Exception caught {e}") + market_maker.run_strategy(symbol, args.strategy, config, account_name, symbols_to_trade=symbols_allowed, rotator_symbols_standardized=rotator_symbols_standardized) quote = "USDT" @@ -339,8 +338,13 @@ def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbol # cached_balance = market_maker.get_balance(quote) # print(f"Futures balance: {cached_balance}") # last_balance_fetch_time = current_time + + # Signal thread completion + thread_completed.set() + except Exception as e: logging.error(f"An error occurred in run_bot for symbol {symbol}: {e}") + thread_completed.set() # Signal thread completion even in case of an exception finally: with thread_to_symbol_lock: @@ -348,47 +352,6 @@ def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbol del thread_to_symbol[current_thread] logging.info(f"Thread for symbol {symbol} has completed.") -def rotate_inactive_symbols(active_symbols, rotator_symbols_queue, thread_start_time, rotation_threshold=160, max_symbols_allowed=5): - current_time = time.time() - rotated_out_symbols = [] - added_symbols = [] - - for symbol in list(active_symbols): - if current_time - thread_start_time.get(symbol, 0) > rotation_threshold: - active_symbols.remove(symbol) - del thread_start_time[symbol] # Remove symbol from thread_start_time tracking - rotated_out_symbols.append(symbol) - - # Add new symbol from the rotator queue if it doesn't exceed max_symbols_allowed - while len(rotator_symbols_queue) > 0 and len(active_symbols) < max_symbols_allowed: - new_symbol = rotator_symbols_queue.popleft() # Get the next symbol from the queue - if new_symbol not in active_symbols: - active_symbols.add(new_symbol) - thread_start_time[new_symbol] = current_time - added_symbols.append(new_symbol) - rotator_symbols_queue.append(new_symbol) # Add it back to the end of the queue - break - - if rotated_out_symbols: - logging.info(f"Rotated out symbols: {rotated_out_symbols}") - if added_symbols: - logging.info(f"Added new symbols: {added_symbols}") - - return active_symbols, thread_start_time - - -# Define the update function for the rotator queue -def update_rotator_queue(rotator_queue, latest_symbols): - # Convert the queue to a set for efficient operations - rotator_set = set(rotator_queue) - # Add new symbols to the set - rotator_set.update(latest_symbols) - # Remove symbols no longer in the latest list - rotator_set.intersection_update(latest_symbols) - time.sleep(1) - # Return a new deque from the updated set - return deque(rotator_set) - def bybit_auto_rotation(args, manager, symbols_allowed): global latest_rotator_symbols, threads, active_symbols, last_rotator_update_time @@ -417,18 +380,31 @@ def bybit_auto_rotation(args, manager, symbols_allowed): # Handle new symbols from the rotator within the allowed limits manage_rotator_symbols(latest_rotator_symbols, args, manager, symbols_allowed) + + # Check for completed threads and perform cleanup + completed_symbols = [] + for symbol, (thread, thread_completed) in threads.items(): + if thread_completed.is_set(): + thread.join() # Wait for the thread to complete + completed_symbols.append(symbol) + + # Remove completed symbols from active_symbols and threads + for symbol in completed_symbols: + active_symbols.discard(symbol) + del threads[symbol] + del thread_start_time[symbol] except Exception as e: logging.error(f"Exception caught in bybit_auto_rotation: {str(e)}") def update_active_symbols(): global active_symbols - active_symbols = {symbol for symbol in active_symbols if symbol in threads and threads[symbol].is_alive()} + active_symbols = {symbol for symbol in active_symbols if symbol in threads and threads[symbol][0].is_alive()} def update_active_threads(open_position_symbols, args, manager, symbols_allowed): global active_symbols for symbol in open_position_symbols: - if symbol not in active_symbols or (symbol in threads and not threads[symbol].is_alive()): + if symbol not in active_symbols or (symbol in threads and not threads[symbol][0].is_alive()): if start_thread_for_symbol(symbol, args, manager): active_symbols.add(symbol) logging.info(f"Started or restarted thread for symbol: {symbol}") @@ -440,7 +416,7 @@ def manage_rotator_symbols(rotator_symbols, args, manager, symbols_allowed): for symbol in rotator_symbols: if needed_slots <= 0: break - if symbol not in active_symbols and (symbol not in threads or not threads[symbol].is_alive()): + if symbol not in active_symbols and (symbol not in threads or not threads[symbol][0].is_alive()): if start_thread_for_symbol(symbol, args, manager): active_symbols.add(symbol) needed_slots -= 1 @@ -456,8 +432,9 @@ def manage_excess_threads(symbols_allowed): def remove_thread_for_symbol(symbol): """Safely removes a thread associated with a symbol.""" - thread = threads.get(symbol) + thread, thread_completed = threads.get(symbol, (None, None)) if thread: + thread_completed.set() # Signal thread completion thread.join() threads.pop(symbol, None) @@ -465,15 +442,16 @@ def start_thread_for_symbol(symbol, args, manager): """Start a new thread for a given symbol.""" logging.info(f"Starting thread for symbol: {symbol}") try: - new_thread = threading.Thread(target=run_bot, args=(symbol, args, manager, args.account_name, symbols_allowed, latest_rotator_symbols)) + thread_completed = threading.Event() + new_thread = threading.Thread(target=run_bot, args=(symbol, args, manager, args.account_name, symbols_allowed, latest_rotator_symbols, thread_completed)) new_thread.start() - threads[symbol] = new_thread + threads[symbol] = (new_thread, thread_completed) thread_start_time[symbol] = time.time() return True # Successfully started thread except Exception as e: logging.error(f"Error starting thread for symbol {symbol}: {e}") return False # Failed to start thread - + def fetch_updated_symbols(args, manager): """Fetches and logs potential symbols based on the current trading strategy.""" strategy = args.strategy.lower()