Skip to content

Commit

Permalink
refactor: downloader, realdebrid, state transitions, symlink module (…
Browse files Browse the repository at this point in the history
…exponential backoff)

chore: add run_at attr to event types to specify next run time and include event with futures
fix: get all content from content services (previously only one item was picked)
fix: trakt indexer not picking up shows
chore: stop spamming retry count in logs
chore: remove whitespaces from modified files
chore: remove unnecessary logging and change some log levels
fix: remove local updater and stop possibility of looping with symlinked state
fix: validate subtitle providers on init, remove addic7ed and napiprojekt providers
feat: "Ongoing" and "Unreleased" states for shows
fix: event updates for frontend
fix: trakt indexing was not copying correct item attributes in previous release
  • Loading branch information
Gaisberg authored and Gaisberg committed Aug 30, 2024
1 parent 71012ef commit 6ee4742
Show file tree
Hide file tree
Showing 30 changed files with 1,033 additions and 1,536 deletions.
403 changes: 209 additions & 194 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion src/program/content/listrr.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ def validate(self) -> bool:

def run(self) -> Generator[MediaItem, None, None]:
"""Fetch new media from `Listrr`"""
items_to_yield = []
self.not_found_ids.clear()
movie_items = self._get_items_from_Listrr("Movies", self.settings.movie_lists)
show_items = self._get_items_from_Listrr("Shows", self.settings.show_lists)
for imdb_id in movie_items + show_items:
if imdb_id not in self.recurring_items:
self.recurring_items.add(imdb_id)
yield MediaItem({"imdb_id": imdb_id, "requested_by": self.key})
items_to_yield.append(MediaItem({"imdb_id": imdb_id, "requested_by": self.key}))
yield items_to_yield

def _get_items_from_Listrr(self, content_type, content_lists) -> list[MediaItem]: # noqa: C901, PLR0912
"""Fetch unique IMDb IDs from Listrr for a given type and list of content."""
Expand Down
8 changes: 4 additions & 4 deletions src/program/content/mdblist.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def validate(self):
def run(self) -> Generator[MediaItem, None, None]:
"""Fetch media from mdblist and add them to media_items attribute
if they are not already there"""

items_to_yield = []
try:
with self.rate_limiter:
for list in self.settings.lists:
Expand All @@ -57,13 +57,13 @@ def run(self) -> Generator[MediaItem, None, None]:
# Check if the item is already completed in the media container
if item.imdb_id and item.imdb_id not in self.recurring_items:
self.recurring_items.add(item.imdb_id)
yield MediaItem(
items_to_yield.append(MediaItem(
{"imdb_id": item.imdb_id, "requested_by": self.key}
)
))

except RateLimitExceeded:
pass
return
yield items_to_yield

def _calculate_request_time(self):
limits = my_limits(self.settings.api_key).limits
Expand Down
5 changes: 4 additions & 1 deletion src/program/content/overseerr.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def run(self):
for item in response.data.results
if item.status == 2 and item.media.status == 3
]
items_to_yield = []
for item in pending_items:
try:
mediaId: int = int(item.media.id)
Expand All @@ -92,7 +93,7 @@ def run(self):
self.recurring_items.add(imdb_id)
media_item = MediaItem({"imdb_id": imdb_id, "requested_by": self.key, "overseerr_id": mediaId, "requested_id": item.id})
if media_item:
yield media_item
items_to_yield.append(media_item)
else:
logger.log("NOT_FOUND", f"Failed to create media item for {imdb_id}")
except Exception as e:
Expand All @@ -102,6 +103,8 @@ def run(self):
if self.settings.use_webhook:
self.run_once = True

yield items_to_yield

def get_imdb_id(self, data) -> str:
"""Get imdbId for item from overseerr"""
if data.mediaType == "show":
Expand Down
9 changes: 4 additions & 5 deletions src/program/content/plex_watchlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def validate(self):

def run(self) -> Generator[Union[Movie, Show, Season, Episode], None, None]:
"""Fetch new media from `Plex Watchlist` and RSS feed if enabled."""
items_to_yield = []
try:
watchlist_items = set(self._get_items_from_watchlist())
rss_items = set(self._get_items_from_rss()) if self.rss_enabled else set()
Expand All @@ -78,11 +79,9 @@ def run(self) -> Generator[Union[Movie, Show, Season, Episode], None, None]:
if not imdb_id or imdb_id in self.recurring_items:
continue
self.recurring_items.add(imdb_id)
media_item = MediaItem({"imdb_id": imdb_id, "requested_by": self.key})
if media_item:
yield media_item
else:
logger.log("NOT_FOUND", f"Failed to create media item for {imdb_id}")
items_to_yield.append(MediaItem({"imdb_id": imdb_id, "requested_by": self.key}))

yield items_to_yield


def _get_items_from_rss(self) -> Generator[MediaItem, None, None]:
Expand Down
13 changes: 8 additions & 5 deletions src/program/content/trakt.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def missing(self):
def run(self):
"""Fetch media from Trakt and yield Movie, Show, or MediaItem instances."""
current_time = time.time()
items_to_yield = []
if current_time < self.next_run_time:
return

Expand Down Expand Up @@ -103,17 +104,19 @@ def run(self):
self.items_already_seen.add(imdb_id)
new_items_count += 1

yield MediaItem({
items_to_yield.append(MediaItem({
"imdb_id": imdb_id,
"requested_by": self.key
})
}))

if new_items_count > 0:
logger.log("TRAKT", f"New items fetched from {source}: {new_items_count}")
total_new_items += new_items_count
if total_new_items > 0:
logger.log("TRAKT", f"Total new items fetched: {total_new_items}")

yield items_to_yield

def _get_watchlist(self, watchlist_users: list) -> list:
"""Get IMDb IDs from Trakt watchlist"""
if not watchlist_users:
Expand Down Expand Up @@ -146,7 +149,7 @@ def _get_list(self, list_items: list) -> list:
if not user or not list_name:
logger.error(f"Invalid list URL: {url}")
continue

items = get_user_list(self.api_url, self.headers, user, list_name)
for item in items:
if hasattr(item, "movie"):
Expand All @@ -163,13 +166,13 @@ def _get_trending_items(self) -> list:
"""Get IMDb IDs from Trakt trending items"""
trending_movies = get_trending_items(self.api_url, self.headers, "movies", self.settings.trending_count)
trending_shows = get_trending_items(self.api_url, self.headers, "shows", self.settings.trending_count)
return self._extract_imdb_ids(trending_movies + trending_shows)
return self._extract_imdb_ids(trending_movies[:self.settings.trending_count] + trending_shows[:self.settings.trending_count])

def _get_popular_items(self) -> list:
"""Get IMDb IDs from Trakt popular items"""
popular_movies = get_popular_items(self.api_url, self.headers, "movies", self.settings.popular_count)
popular_shows = get_popular_items(self.api_url, self.headers, "shows", self.settings.popular_count)
return self._extract_imdb_ids_with_none_type(popular_movies + popular_shows)
return self._extract_imdb_ids_with_none_type(popular_movies[:self.settings.popular_count] + popular_shows[:self.settings.popular_count])

def _extract_imdb_ids(self, items: list) -> list:
"""Extract IMDb IDs and types from a list of items"""
Expand Down
36 changes: 21 additions & 15 deletions src/program/db/db_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ def blacklist_stream(item: "MediaItem", stream: Stream, session: Session = None)
close_session = False
if session is None:
session = db.Session()
item = session.execute(select(type(item)).where(type(item)._id == item._id)).unique().scalar_one()
close_session = True

try:
item.store_state()
item = session.merge(item)
association_exists = session.query(
session.query(StreamRelation)
Expand All @@ -150,7 +150,7 @@ def blacklist_stream(item: "MediaItem", stream: Stream, session: Session = None)
insert(StreamBlacklistRelation)
.values(media_item_id=item._id, stream_id=stream._id)
)

item.store_state()
session.commit()
return True
return False
Expand Down Expand Up @@ -222,7 +222,8 @@ def _get_item_ids(session, item):
.where(Episode.parent_id == season_id)
).scalars().all()
return season_id, episode_ids

elif item.type == "episode":
return item._id, []
elif hasattr(item, "parent"):
parent_id = item.parent._id
return parent_id, []
Expand Down Expand Up @@ -306,16 +307,19 @@ def _check_for_and_run_insertion_required(session, item: "MediaItem") -> bool:

def _run_thread_with_db_item(fn, service, program, input_item: "MediaItem" = None):
from program.media.item import MediaItem, Movie, Show, Season, Episode
if input_item is not None:
if input_item:
with db.Session() as session:
if isinstance(input_item, (Movie, Show, Season, Episode)):
if not _check_for_and_run_insertion_required(session, input_item):
pass
input_item = _get_item_from_db(session, input_item)

for res in fn(input_item):
if not isinstance(res, MediaItem):
logger.log("PROGRAM", f"Service {service.__name__} emitted {res} from input item {input_item} of type {type(res).__name__}, backing off.")
if isinstance(res, tuple):
item, _ = res
else:
item = res
if not isinstance(item, MediaItem):
logger.log("PROGRAM", f"Service {service.__name__} emitted {item} from input item {input_item} of type {type(item).__name__}, backing off.")
program.em.remove_item_from_running(input_item)

input_item.store_state()
Expand All @@ -328,34 +332,36 @@ def _run_thread_with_db_item(fn, service, program, input_item: "MediaItem" = Non
for i in fn(input_item):
if isinstance(i, (MediaItem)):
with db.Session() as session:
_check_for_and_run_insertion_required(session, i)
_check_for_and_run_insertion_required(session, i)
yield i
return
else:
# Content services
for i in fn():
if isinstance(i, (MediaItem)):
with db.Session() as session:
_check_for_and_run_insertion_required(session, i)
yield i
program.em.add_item(i, service)
elif isinstance(i, list) and all(isinstance(item, MediaItem) for item in i):
for item in i:
program.em.add_item(item, service)
return

def hard_reset_database():
"""Resets the database to a fresh state."""
logger.log("DATABASE", "Resetting Database")

# Drop all tables
db.Model.metadata.drop_all(db.engine)
logger.log("DATABASE","All MediaItem tables dropped")

# Drop the alembic_version table
with db.engine.connect() as connection:
connection.execute(text("DROP TABLE IF EXISTS alembic_version CASCADE"))
logger.log("DATABASE","Alembic table dropped")

# Recreate all tables
db.Model.metadata.create_all(db.engine)
logger.log("DATABASE","All tables recreated")

# Reinitialize Alembic
logger.log("DATABASE","Removing Alembic Directory")
shutil.rmtree(alembic_dir, ignore_errors=True)
Expand Down
109 changes: 93 additions & 16 deletions src/program/downloaders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,108 @@
from .alldebrid import AllDebridDownloader
from .realdebrid import RealDebridDownloader
from .torbox import TorBoxDownloader
from ..media import States
from .shared import get_needed_media
from concurrent.futures import CancelledError, ThreadPoolExecutor, as_completed


class Downloader:
def __init__(self):
self.key = "downloader"
self.initialized = False
self.services = {
RealDebridDownloader: RealDebridDownloader(),
TorBoxDownloader: TorBoxDownloader(),
AllDebridDownloader: AllDebridDownloader(),
}
self.service = next((service for service in [
RealDebridDownloader(),
#AllDebridDownloader(),
#TorBoxDownloader()
] if service.initialized), None)

self.initialized = self.validate()

@property
def service(self):
return next(service for service in self.services.values() if service.initialized)

def validate(self):
initialized_services = [service for service in self.services.values() if service.initialized]
if len(initialized_services) > 1:
logger.error("More than one downloader service is initialized. Only one downloader can be initialized at a time.")
if self.service is None:
logger.error("No downloader service is initialized. Please initialize a downloader service.")
return False
return len(initialized_services) == 1
return True

def run(self, item: MediaItem):
self.service.run(item)
yield item
logger.debug(f"Running downloader for {item.log_string}")
needed_media = get_needed_media(item)
hashes = [stream.infohash for stream in item.streams if stream.infohash not in self.service.existing_hashes]
cached_streams = self.get_cached_streams(hashes, needed_media)
if cached_streams:
item.active_stream = cached_streams[0]
try:
self.download(item, item.active_stream)
except Exception as e:
logger.error(f"Failed to download {item.log_string}: {e}")
self._delete_and_reset_active_stream(item)
else:
for stream in item.streams:
item.blacklist_stream(stream)
logger.log("DEBRID", f"No cached torrents found for {item.log_string}")
yield item

def _delete_and_reset_active_stream(self, item: MediaItem):
self.service.existing_hashes.remove(item.active_stream["infohash"])
self.service.delete_torrent_with_infohash(item.active_stream["infohash"])
stream = next(stream for stream in item.streams if stream.infohash == item.active_stream["infohash"])
item.active_stream = {}
item.blacklist_stream(stream)

def get_cached_streams(self, hashes: list[str], needed_media, break_on_first = True) -> dict:
chunks = [hashes[i:i + 5] for i in range(0, len(hashes), 5)]
# Using a list to share the state, booleans are immutable
break_pointer = [False, break_on_first]
results = []

with ThreadPoolExecutor(thread_name_prefix="Dowloader") as executor:
futures = []
for chunk in chunks:
future = executor.submit(self.service.process_hashes, chunk, needed_media, break_pointer)
futures.append(future)

for future in as_completed(futures):
try:
_result = future.result()
except CancelledError:
continue
if isinstance(_result, dict):
results.append(_result)
if break_on_first:
for future in futures:
future.cancel()

return results

def download(self, item, active_stream: dict) -> str:
torrent_id = self.service.download_cached(active_stream)
torrent_names = self.service.get_torrent_names(torrent_id)
update_item_attributes(item, torrent_names)
logger.log("DEBRID", f"Downloaded {item.log_string}")

def update_item_attributes(item: MediaItem, names: tuple[str, str]):
""" Update the item attributes with the downloaded files and active stream """
matches_dict = item.active_stream.get("matched_files")
item.folder = names[0]
item.alternative_folder = names[1]
stream = next((stream for stream in item.streams if stream.infohash == item.active_stream["infohash"]), None)
item.active_stream["name"] = stream.raw_title

if item.type in ["movie", "episode"]:
item.file = next(file["filename"] for file in next(iter(matches_dict.values())).values())
elif item.type == "show":
for season in item.seasons:
for episode in season.episodes:
file = matches_dict.get(season.number, {}).get(episode.number, {})
if file:
episode.file = file["filename"]
episode.folder = item.folder
episode.alternative_folder = item.alternative_folder
episode.active_stream = {**item.active_stream, "files": [ episode.file ] }
elif item.type == "season":
for episode in item.episodes:
file = matches_dict.get(item.number, {}).get(episode.number, {})
if file:
episode.file = file["filename"]
episode.folder = item.folder
episode.alternative_folder = item.alternative_folder
episode.active_stream = {**item.active_stream, "files": [ episode.file ] }
4 changes: 2 additions & 2 deletions src/program/downloaders/alldebrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ def _is_wanted_show(self, files: list, item: Show) -> bool:
acceptable_states = [States.Indexed, States.Scraped, States.Unknown, States.Failed]

for season in item.seasons:
if season.state in acceptable_states and season.is_released_nolog:
needed_episode_numbers = {episode.number for episode in season.episodes if episode.state in acceptable_states and episode.is_released_nolog}
if season.state in acceptable_states and season.is_released:
needed_episode_numbers = {episode.number for episode in season.episodes if episode.state in acceptable_states and episode.is_released}
if needed_episode_numbers:
needed_episodes[season.number] = needed_episode_numbers
if not needed_episodes:
Expand Down
Loading

0 comments on commit 6ee4742

Please sign in to comment.