Skip to content

Commit

Permalink
I am sleep coding
Browse files Browse the repository at this point in the history
  • Loading branch information
donewiththedollar committed Apr 17, 2024
1 parent 7cae8a3 commit db94bf5
Showing 1 changed file with 33 additions and 55 deletions.
88 changes: 33 additions & 55 deletions multi_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,21 +281,20 @@ def get_symbols(self):

orders_canceled = False

def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbols_standardized):
def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbols_standardized, thread_completed):
global orders_canceled
current_thread = threading.current_thread()

try:
with thread_to_symbol_lock:
thread_to_symbol[current_thread] = symbol

time.sleep(1)

# Correct the path for the configuration file
if not args.config.startswith('configs/'):
config_file_path = Path('configs/' + args.config)
else:
config_file_path = Path(args.config)

print("Loading config from:", config_file_path)
config = load_config(config_file_path)

Expand All @@ -310,21 +309,21 @@ def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbol
print(f"Trading symbol: {symbol}")
print(f"Exchange name: {exchange_name}")
print(f"Strategy name: {strategy_name}")
print(f"Account name: {account_name}")
print(f"Account name: {account_name}")

# Pass account_name to DirectionalMarketMaker constructor
market_maker = DirectionalMarketMaker(config, exchange_name, account_name)
market_maker.manager = manager

try:
# Cancel all open orders at the startup of the first thread only
if not orders_canceled and hasattr(market_maker.exchange, 'cancel_all_open_orders_bybit'):
market_maker.exchange.cancel_all_open_orders_bybit()
logging.info(f"Cleared all open orders on the exchange upon initialization.")
orders_canceled = True # Set the flag to True to prevent future cancellations
except Exception as e:
logging.info(f"Excetion caught {e}")
logging.info(f"Exception caught {e}")

market_maker.run_strategy(symbol, args.strategy, config, account_name, symbols_to_trade=symbols_allowed, rotator_symbols_standardized=rotator_symbols_standardized)

quote = "USDT"
Expand All @@ -339,56 +338,20 @@ def run_bot(symbol, args, manager, account_name, symbols_allowed, rotator_symbol
# cached_balance = market_maker.get_balance(quote)
# print(f"Futures balance: {cached_balance}")
# last_balance_fetch_time = current_time

# Signal thread completion
thread_completed.set()

except Exception as e:
logging.error(f"An error occurred in run_bot for symbol {symbol}: {e}")
thread_completed.set() # Signal thread completion even in case of an exception

finally:
with thread_to_symbol_lock:
if current_thread in thread_to_symbol:
del thread_to_symbol[current_thread]
logging.info(f"Thread for symbol {symbol} has completed.")

def rotate_inactive_symbols(active_symbols, rotator_symbols_queue, thread_start_time, rotation_threshold=160, max_symbols_allowed=5):
current_time = time.time()
rotated_out_symbols = []
added_symbols = []

for symbol in list(active_symbols):
if current_time - thread_start_time.get(symbol, 0) > rotation_threshold:
active_symbols.remove(symbol)
del thread_start_time[symbol] # Remove symbol from thread_start_time tracking
rotated_out_symbols.append(symbol)

# Add new symbol from the rotator queue if it doesn't exceed max_symbols_allowed
while len(rotator_symbols_queue) > 0 and len(active_symbols) < max_symbols_allowed:
new_symbol = rotator_symbols_queue.popleft() # Get the next symbol from the queue
if new_symbol not in active_symbols:
active_symbols.add(new_symbol)
thread_start_time[new_symbol] = current_time
added_symbols.append(new_symbol)
rotator_symbols_queue.append(new_symbol) # Add it back to the end of the queue
break

if rotated_out_symbols:
logging.info(f"Rotated out symbols: {rotated_out_symbols}")
if added_symbols:
logging.info(f"Added new symbols: {added_symbols}")

return active_symbols, thread_start_time


# Define the update function for the rotator queue
def update_rotator_queue(rotator_queue, latest_symbols):
# Convert the queue to a set for efficient operations
rotator_set = set(rotator_queue)
# Add new symbols to the set
rotator_set.update(latest_symbols)
# Remove symbols no longer in the latest list
rotator_set.intersection_update(latest_symbols)
time.sleep(1)
# Return a new deque from the updated set
return deque(rotator_set)

def bybit_auto_rotation(args, manager, symbols_allowed):
global latest_rotator_symbols, threads, active_symbols, last_rotator_update_time

Expand Down Expand Up @@ -417,18 +380,31 @@ def bybit_auto_rotation(args, manager, symbols_allowed):

# Handle new symbols from the rotator within the allowed limits
manage_rotator_symbols(latest_rotator_symbols, args, manager, symbols_allowed)

# Check for completed threads and perform cleanup
completed_symbols = []
for symbol, (thread, thread_completed) in threads.items():
if thread_completed.is_set():
thread.join() # Wait for the thread to complete
completed_symbols.append(symbol)

# Remove completed symbols from active_symbols and threads
for symbol in completed_symbols:
active_symbols.discard(symbol)
del threads[symbol]
del thread_start_time[symbol]

except Exception as e:
logging.error(f"Exception caught in bybit_auto_rotation: {str(e)}")

def update_active_symbols():
global active_symbols
active_symbols = {symbol for symbol in active_symbols if symbol in threads and threads[symbol].is_alive()}
active_symbols = {symbol for symbol in active_symbols if symbol in threads and threads[symbol][0].is_alive()}

def update_active_threads(open_position_symbols, args, manager, symbols_allowed):
global active_symbols
for symbol in open_position_symbols:
if symbol not in active_symbols or (symbol in threads and not threads[symbol].is_alive()):
if symbol not in active_symbols or (symbol in threads and not threads[symbol][0].is_alive()):
if start_thread_for_symbol(symbol, args, manager):
active_symbols.add(symbol)
logging.info(f"Started or restarted thread for symbol: {symbol}")
Expand All @@ -440,7 +416,7 @@ def manage_rotator_symbols(rotator_symbols, args, manager, symbols_allowed):
for symbol in rotator_symbols:
if needed_slots <= 0:
break
if symbol not in active_symbols and (symbol not in threads or not threads[symbol].is_alive()):
if symbol not in active_symbols and (symbol not in threads or not threads[symbol][0].is_alive()):
if start_thread_for_symbol(symbol, args, manager):
active_symbols.add(symbol)
needed_slots -= 1
Expand All @@ -456,24 +432,26 @@ def manage_excess_threads(symbols_allowed):

def remove_thread_for_symbol(symbol):
"""Safely removes a thread associated with a symbol."""
thread = threads.get(symbol)
thread, thread_completed = threads.get(symbol, (None, None))
if thread:
thread_completed.set() # Signal thread completion
thread.join()
threads.pop(symbol, None)

def start_thread_for_symbol(symbol, args, manager):
"""Start a new thread for a given symbol."""
logging.info(f"Starting thread for symbol: {symbol}")
try:
new_thread = threading.Thread(target=run_bot, args=(symbol, args, manager, args.account_name, symbols_allowed, latest_rotator_symbols))
thread_completed = threading.Event()
new_thread = threading.Thread(target=run_bot, args=(symbol, args, manager, args.account_name, symbols_allowed, latest_rotator_symbols, thread_completed))
new_thread.start()
threads[symbol] = new_thread
threads[symbol] = (new_thread, thread_completed)
thread_start_time[symbol] = time.time()
return True # Successfully started thread
except Exception as e:
logging.error(f"Error starting thread for symbol {symbol}: {e}")
return False # Failed to start thread

def fetch_updated_symbols(args, manager):
"""Fetches and logs potential symbols based on the current trading strategy."""
strategy = args.strategy.lower()
Expand Down

0 comments on commit db94bf5

Please sign in to comment.