From 87c324189a1dd78fed0b06e502e10eba4ae1db58 Mon Sep 17 00:00:00 2001 From: Dreu LaVelle Date: Thu, 25 Jul 2024 23:20:49 -0500 Subject: [PATCH] fix: add extra logging to track issue. added mutex to add_to_running --- src/program/db/db_functions.py | 2 +- src/program/program.py | 67 ++++++++++++++++++++++------------ 2 files changed, 45 insertions(+), 24 deletions(-) diff --git a/src/program/db/db_functions.py b/src/program/db/db_functions.py index 431c6d6d..451f0493 100644 --- a/src/program/db/db_functions.py +++ b/src/program/db/db_functions.py @@ -132,4 +132,4 @@ def run_delete(_type): run_delete(MediaItem) logger.log("PROGRAM", "Database reset. Turning off HARD_RESET Env Var.") - os.environ["HARD_RESET"] = "False" + os.environ.pop('HARD_RESET', None) \ No newline at end of file diff --git a/src/program/program.py b/src/program/program.py index e1ac5d5f..4beb8b5e 100644 --- a/src/program/program.py +++ b/src/program/program.py @@ -235,30 +235,47 @@ def _schedule_services(self) -> None: logger.log("PROGRAM", f"Scheduled {service_cls.__name__} to run every {update_interval} seconds.") def _id_in_queue(self, id): + if id is None: + return False return any(i._id == id for i in self.queued_items) def _id_in_running_items(self, id): + if id is None: + return False return any(i._id == id for i in self.running_items) - def _push_event_queue(self, event): + def _push_event_queue(self, event) -> bool: with self.mutex: - if( event.item not in self.queued_items and event.item not in self.running_items): - if hasattr(event.item, "_id") and event.item._id is not None: - if isinstance(event.item, Show): - for s in event.item.seasons: - if s._id and (self._id_in_queue(s._id) or self._id_in_running_items(s._id)): - return None - for e in s.episodes: - if e._id and (self._id_in_queue(e._id) or self._id_in_running_items(e._id)): - return None - if isinstance(event.item, Season): - for e in event.item.episodes: - if self._id_in_queue(e._id) or self._id_in_running_items(e._id): - return None - if hasattr(event.item, "parent") and ( self._id_in_queue(event.item.parent._id) or self._id_in_running_items(event.item.parent._id) ): - return None - if hasattr(event.item, "parent") and hasattr(event.item.parent, "parent") and event.item.parent.parent and ( self._id_in_queue(event.item.parent.parent._id) or self._id_in_running_items(event.item.parent.parent._id)): - return None + if event.item in self.queued_items or event.item in self.running_items: + logger.debug(f"Item {event.item.log_string} is already in the queue or running, skipping.") + return False + + if hasattr(event.item, "_id") and event.item._id is not None: + if isinstance(event.item, Show): + for season in event.item.seasons: + if season._id and (self._id_in_queue(season._id) or self._id_in_running_items(season._id)): + logger.debug(f"Season {season.log_string} of show {event.item.log_string} is already in the queue or running, skipping.") + return False + for episode in season.episodes: + if episode._id and (self._id_in_queue(episode._id) or self._id_in_running_items(episode._id)): + logger.debug(f"Episode {episode.log_string} of season {season.log_string} is already in the queue or running, skipping.") + return False + elif isinstance(event.item, Season): + for episode in event.item.episodes: + if self._id_in_queue(episode._id) or self._id_in_running_items(episode._id): + logger.debug(f"Episode {episode.log_string} of season {event.item.log_string} is already in the queue or running, skipping.") + return False + elif hasattr(event.item, "parent"): + parent = event.item.parent + if self._id_in_queue(parent._id) or self._id_in_running_items(parent._id): + logger.debug(f"Parent {parent.log_string} of item {event.item.log_string} is already in the queue or running, skipping.") + return False + if hasattr(parent, "parent") and parent.parent: + grandparent = parent.parent + if self._id_in_queue(grandparent._id) or self._id_in_running_items(grandparent._id): + logger.debug(f"Grandparent {grandparent.log_string} of item {event.item.log_string} is already in the queue or running, skipping.") + return False + self.queued_items.append(event.item) self.event_queue.put(event) if not isinstance(event.item, (Show, Movie, Episode, Season)): @@ -278,15 +295,15 @@ def _remove_from_running_items(self, item, service_name=""): with self.mutex: if item in self.running_items: self.running_items.remove(item) - logger.log("PROGRAM", f"Item {item.log_string} finished running section {service_name}" ) + logger.log("PROGRAM", f"Item {item.log_string} finished running section {service_name} with state {item.state.value}" ) def add_to_running(self, item, service_name): if item is None: return - if item not in self.running_items: - if isinstance(item, MediaItem) and not self._id_in_running_items(item._id) or not isinstance(item, MediaItem): + with self.mutex: + if item not in self.running_items: self.running_items.append(item) - logger.log("PROGRAM", f"Item {item.log_string} started running section {service_name}" ) + logger.log("PROGRAM", f"Item {item.log_string} started running section {service_name} with state {item.state.value}") def _process_future_item(self, future: Future, service: Service, orig_item: MediaItem) -> None: """Callback to add the results from a future emitted by a service to the event queue.""" @@ -378,8 +395,9 @@ def run(self): except Empty: self.dump_tracemalloc() continue + with db.Session() as session: - existing_item = DB._get_item_from_db(session, event.item) + existing_item: MediaItem | None = DB._get_item_from_db(session, event.item) updated_item, next_service, items_to_submit = process_event( existing_item, event.emitted_by, existing_item if existing_item is not None else event.item ) @@ -391,10 +409,13 @@ def run(self): if items_to_submit: for item_to_submit in items_to_submit: + logger.debug(f"Submitting {item_to_submit.log_string} to {next_service.__name__}") self.add_to_running(item_to_submit, next_service.__name__) self._submit_job(next_service, item_to_submit) if isinstance(existing_item, MediaItem): + logger.debug(f"Storing state of {existing_item.log_string}") existing_item.store_state() + logger.debug(f"Committing changes to the database") session.commit() def stop(self):