diff --git a/resources/src/ai/trainer.py b/resources/src/ai/trainer.py index 08b3b60..9ca041b 100644 --- a/resources/src/ai/trainer.py +++ b/resources/src/ai/trainer.py @@ -138,7 +138,7 @@ def train(self, raw_data, epochs=20, batch_size=32, backup_path=None): backup_path (None or str): path to where the backups should be saved. """ if backup_path is None: - backup_path = "./backups/" + backup_path = "resources/src/ai/backups/" date = datetime.now().strftime("%y-%m-%dT%H:%M") self.save_model(f"{backup_path}{date}.keras",f"{backup_path}{date}.ini") prep_data = self.prepare_data_for_training(raw_data) diff --git a/resources/src/config.ini b/resources/src/config.ini index 806bedc..be13bb8 100644 --- a/resources/src/config.ini +++ b/resources/src/config.ini @@ -13,7 +13,7 @@ metric=bytes schedule_hour=* * * * * epochs=20 batch_size=32 -backup_path=./backups/ +backup_path=resources/src/ai/backups/ #target_sensors=FlowSensor model_names=traffic diff --git a/resources/src/redborder/zookeeper/rb_outliers_zoo_sync.py b/resources/src/redborder/zookeeper/rb_outliers_zoo_sync.py index cd14c1a..582b80a 100644 --- a/resources/src/redborder/zookeeper/rb_outliers_zoo_sync.py +++ b/resources/src/redborder/zookeeper/rb_outliers_zoo_sync.py @@ -18,7 +18,6 @@ # Because distributed systems are a zoo...... import os -import random import time from kazoo.recipe.election import Election from kazoo.recipe.queue import LockingQueue @@ -45,7 +44,7 @@ def __init__(self) -> None: configurations, including the ZooKeeper client and S3 client. """ self.is_leader = False - self.running = False + self.is_running = False self.s3_client = None self.queue = None self.election = None @@ -89,7 +88,7 @@ def sync_nodes(self) -> None: logger.info("Synchronizing nodes") self.setup_s3() self._ensure_paths() - self.running = True + self.is_running = True self.queue = LockingQueue(self.zookeeper, self.paths["queue"]) self.election = Election(self.zookeeper, self.paths["election"], identifier=self.name) self.leader_watcher = ChildrenWatch(self.zookeeper, self.paths["leader"], self._participate_in_election) @@ -104,7 +103,7 @@ def cleanup(self, signum: int, frame) -> None: frame: The current stack frame. """ logger.info(f"Cleanup called with signal {signum}") - self.running = False + self.is_running = False self.election.cancel() self.leader_watcher._stopped = True if self.is_leader: @@ -117,7 +116,7 @@ def _run_tasks(self) -> None: """ Runs tasks based on the leadership status. """ - while self.running: + while self.is_running: if self.is_leader: self._leader_tasks() else: @@ -129,7 +128,7 @@ def _leader_tasks(self) -> None: Runs the tasks for the leader node. """ logger.info("Running leader tasks") - while self.is_leader and self.running: + while self.is_leader and self.is_running: self._get_models() self._locks_models_on_zoo() next_task_time = time.time() + self.sleep_time @@ -149,13 +148,15 @@ def _follower_tasks(self) -> None: Runs the tasks for the follower nodes. """ logger.info("Running follower tasks") - while self.running: - while not self.is_leader and self._leader_exists(): - model = self._get_model_from_queue() - if model: - self._process_model_as_follower(model) - time.sleep(2) - time.sleep(2) + while self.is_running and not self.is_leader: + if not self._leader_exists(): + logger.info("No leader found, waiting...") + time.sleep(5) + continue + model = self._get_model_from_queue() + if model: + self._process_model_as_follower(model) + time.sleep(5) def _participate_in_election(self, leader_nodes: list[str]) -> None: """ @@ -164,7 +165,7 @@ def _participate_in_election(self, leader_nodes: list[str]) -> None: Parameters: leader_nodes (list[str]): A list of leader nodes. """ - if not self._leader_exists() and self.running: + if not self._leader_exists() and self.is_running: logger.info("Participating in election") try: if self.election.lock.acquire(timeout=10): @@ -222,7 +223,7 @@ def _locks_models_on_zoo(self) -> None: """ Locks the models in ZooKeeper. """ - if self.is_leader and self.running: + if self.is_leader and self.is_running: b_models = [bytes(model, "utf-8") for model in self.models] self.queue.put_all(b_models) logger.info(f"Locked models {', '.join(self.models)}") @@ -260,5 +261,4 @@ def _process_model_as_follower(self, model: str) -> None: logger.info(f"Finished training of model {model}") except Exception as e: logger.error(f"Client {self.name} failed to process {model}: {e}") - self.queue.put(bytes(model, "utf-8")) - logger.error(f"Client {self.name} requeued {model}") + self._delete_node(self.paths["taken"], model)