Skip to content

Commit cc68fa5

Browse files
authored
Filesystem matching and metadata fixes (#1625)
1 parent 78d065b commit cc68fa5

File tree

8 files changed

+1151
-1308
lines changed

8 files changed

+1151
-1308
lines changed

music_assistant/server/controllers/metadata.py

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -428,21 +428,8 @@ async def _update_artist_metadata(self, artist: Artist, force_refresh: bool = Fa
428428
local_provs = get_global_cache_value("non_streaming_providers")
429429
if TYPE_CHECKING:
430430
local_provs = cast(set[str], local_provs)
431-
for prov_mapping in artist.provider_mappings:
432-
if prov_mapping.provider_instance not in local_provs:
433-
continue
434-
if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
435-
continue
436-
if prov.lookup_key in unique_keys:
437-
continue
438-
unique_keys.add(prov.lookup_key)
439-
with suppress(MediaNotFoundError):
440-
prov_item = await self.mass.music.artists.get_provider_item(
441-
prov_mapping.item_id, prov_mapping.provider_instance
442-
)
443-
artist.metadata.update(prov_item.metadata)
444431

445-
# collect metadata from all (online) music/metadata providers
432+
# collect metadata from all (online) music + metadata providers
446433
# NOTE: we only allow this every REFRESH_INTERVAL and a max amount of calls per day
447434
# to not overload the music/metadata providers with api calls
448435
# TODO: Utilize a global (cloud) cache for metadata lookups to save on API calls
@@ -459,12 +446,14 @@ async def _update_artist_metadata(self, artist: Artist, force_refresh: bool = Fa
459446
await self.mass.music.artists.match_providers(artist)
460447

461448
# collect metadata from all (streaming) music providers
449+
# NOTE: local providers have already pushed their metadata in the sync
462450
for prov_mapping in artist.provider_mappings:
463451
if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
464452
continue
465453
if prov.lookup_key in unique_keys:
466454
continue
467-
unique_keys.add(prov.lookup_key)
455+
if prov.lookup_key not in local_provs:
456+
unique_keys.add(prov.lookup_key)
468457
with suppress(MediaNotFoundError):
469458
prov_item = await self.mass.music.artists.get_provider_item(
470459
prov_mapping.item_id, prov_mapping.provider_instance
@@ -495,26 +484,12 @@ async def _update_artist_metadata(self, artist: Artist, force_refresh: bool = Fa
495484
async def _update_album_metadata(self, album: Album, force_refresh: bool = False) -> None:
496485
"""Get/update rich metadata for an album."""
497486
self.logger.debug("Updating metadata for Album %s", album.name)
498-
unique_keys: set[str] = set()
499487
# collect (local) metadata from all local music providers
500488
local_provs = get_global_cache_value("non_streaming_providers")
501489
if TYPE_CHECKING:
502490
local_provs = cast(set[str], local_provs)
503-
for prov_mapping in album.provider_mappings:
504-
if prov_mapping.provider_instance not in local_provs:
505-
continue
506-
if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
507-
continue
508-
if prov.lookup_key in unique_keys:
509-
continue
510-
unique_keys.add(prov.lookup_key)
511-
with suppress(MediaNotFoundError):
512-
prov_item = await self.mass.music.albums.get_provider_item(
513-
prov_mapping.item_id, prov_mapping.provider_instance
514-
)
515-
album.metadata.update(prov_item.metadata)
516491

517-
# collect metadata from all (online) music/metadata providers
492+
# collect metadata from all (online) music + metadata providers
518493
# NOTE: we only allow this every REFRESH_INTERVAL and a max amount of calls per day
519494
# to not overload the (free) metadata providers with api calls
520495
# TODO: Utilize a global (cloud) cache for metadata lookups to save on API calls
@@ -531,12 +506,15 @@ async def _update_album_metadata(self, album: Album, force_refresh: bool = False
531506
await self.mass.music.albums.match_providers(album)
532507

533508
# collect metadata from all (streaming) music providers
509+
# NOTE: local providers have already pushed their metadata in the sync
510+
unique_keys: set[str] = set()
534511
for prov_mapping in album.provider_mappings:
535512
if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
536513
continue
537514
if prov.lookup_key in unique_keys:
538515
continue
539-
unique_keys.add(prov.lookup_key)
516+
if prov.lookup_key not in local_provs:
517+
unique_keys.add(prov.lookup_key)
540518
with suppress(MediaNotFoundError):
541519
prov_item = await self.mass.music.albums.get_provider_item(
542520
prov_mapping.item_id, prov_mapping.provider_instance
@@ -600,7 +578,9 @@ async def _update_track_metadata(self, track: Track, force_refresh: bool = False
600578
track.metadata.update(prov_item.metadata)
601579

602580
# collect metadata from all metadata providers
603-
if self.config.get_value(CONF_ENABLE_ONLINE_METADATA):
581+
# there is only little metadata available for tracks so we only fetch metadata
582+
# from other sources if the force flag is set
583+
if force_refresh and self.config.get_value(CONF_ENABLE_ONLINE_METADATA):
604584
for provider in self.providers:
605585
if ProviderFeature.TRACK_METADATA not in provider.supported_features:
606586
continue
@@ -756,39 +736,44 @@ async def _metadata_scanner(self) -> None:
756736
self.logger.info("Starting metadata scanner")
757737
self._online_slots_available = MAX_ONLINE_CALLS_PER_RUN
758738
timestamp = int(time() - 60 * 60 * 24 * 30)
739+
# ARTISTS metadata refresh
759740
query = (
760741
f"json_extract({DB_TABLE_ARTISTS}.metadata,'$.last_refresh') ISNULL "
761742
f"OR json_extract({DB_TABLE_ARTISTS}.metadata,'$.last_refresh') < {timestamp}"
762743
)
763744
for artist in await self.mass.music.artists.library_items(
764-
limit=2500, order_by="random", extra_query=query
745+
limit=50, order_by="random", extra_query=query
765746
):
766747
await self._update_artist_metadata(artist)
767748

749+
# ALBUMS metadata refresh
768750
query = (
769751
f"json_extract({DB_TABLE_ALBUMS}.metadata,'$.last_refresh') ISNULL "
770752
f"OR json_extract({DB_TABLE_ALBUMS}.metadata,'$.last_refresh') < {timestamp}"
771753
)
772754
for album in await self.mass.music.albums.library_items(
773-
limit=2500, order_by="random", extra_query=query
755+
limit=50, order_by="random", extra_query=query
774756
):
775757
await self._update_album_metadata(album)
776758

759+
# PLAYLISTS metadata refresh
777760
query = (
778761
f"json_extract({DB_TABLE_PLAYLISTS}.metadata,'$.last_refresh') ISNULL "
779762
f"OR json_extract({DB_TABLE_PLAYLISTS}.metadata,'$.last_refresh') < {timestamp}"
780763
)
781764
for playlist in await self.mass.music.playlists.library_items(
782-
limit=2500, order_by="random", extra_query=query
765+
limit=50, order_by="random", extra_query=query
783766
):
784767
await self._update_playlist_metadata(playlist)
785768

769+
# TRACKS metadata refresh
770+
timestamp = int(time() - 60 * 60 * 24 * 30)
786771
query = (
787772
f"json_extract({DB_TABLE_TRACKS}.metadata,'$.last_refresh') ISNULL "
788773
f"OR json_extract({DB_TABLE_TRACKS}.metadata,'$.last_refresh') < {timestamp}"
789774
)
790775
for track in await self.mass.music.tracks.library_items(
791-
limit=2500, order_by="random", extra_query=query
776+
limit=50, order_by="random", extra_query=query
792777
):
793778
await self._update_track_metadata(track)
794779
self.logger.info("Metadata scanner finished.")

music_assistant/server/helpers/tags.py

Lines changed: 21 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from collections.abc import Iterable
1010
from dataclasses import dataclass
1111
from json import JSONDecodeError
12-
from typing import TYPE_CHECKING, Any
12+
from typing import Any
1313

1414
import eyed3
1515

@@ -20,9 +20,6 @@
2020
from music_assistant.constants import MASS_LOGGER_NAME, UNKNOWN_ARTIST
2121
from music_assistant.server.helpers.process import AsyncProcess
2222

23-
if TYPE_CHECKING:
24-
from collections.abc import AsyncGenerator
25-
2623
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.tags")
2724

2825
# silence the eyed3 logger because it is too verbose
@@ -369,16 +366,12 @@ def get(self, key: str, default=None) -> Any:
369366
return self.tags.get(key, default)
370367

371368

372-
async def parse_tags(
373-
input_file: str | AsyncGenerator[bytes, None], file_size: int | None = None
374-
) -> AudioTags:
375-
"""Parse tags from a media file.
376-
377-
input_file may be a (local) filename/url accessible by ffmpeg or
378-
an AsyncGenerator which yields the file contents as bytes.
369+
async def parse_tags(input_file: str, file_size: int | None = None) -> AudioTags:
379370
"""
380-
file_path = input_file if isinstance(input_file, str) else "-"
371+
Parse tags from a media file (or URL).
381372
373+
Input_file may be a (local) filename or URL accessible by ffmpeg.
374+
"""
382375
args = (
383376
"ffprobe",
384377
"-hide_banner",
@@ -393,35 +386,11 @@ async def parse_tags(
393386
"-print_format",
394387
"json",
395388
"-i",
396-
file_path,
389+
input_file,
397390
)
398-
399-
writer_task: asyncio.Task | None = None
400-
ffmpeg_proc = AsyncProcess(args, stdin=file_path == "-", stdout=True)
401-
await ffmpeg_proc.start()
402-
403-
async def writer() -> None:
404-
bytes_read = 0
405-
async for chunk in input_file:
406-
if ffmpeg_proc.closed:
407-
break
408-
await ffmpeg_proc.write(chunk)
409-
bytes_read += len(chunk)
410-
del chunk
411-
if bytes_read > 25 * 1000000:
412-
# this is possibly a m4a file with 'moove atom' metadata at the
413-
# end of the file
414-
# we'll have to read the entire file to do something with it
415-
# for now we just ignore/deny these files
416-
LOGGER.error("Found file with tags not present at beginning of file")
417-
break
418-
419-
if file_path == "-":
420-
# feed the file contents to the process
421-
writer_task = asyncio.create_task(writer)
422-
391+
async with AsyncProcess(args, stdin=False, stdout=True) as ffmpeg:
392+
res = await ffmpeg.read(-1)
423393
try:
424-
res = await ffmpeg_proc.read(-1)
425394
data = json.loads(res)
426395
if error := data.get("error"):
427396
raise InvalidDataError(error["string"])
@@ -438,74 +407,47 @@ async def writer() -> None:
438407
tags.duration = float(tags.raw["format"]["duration"])
439408

440409
if (
441-
not file_path.startswith("http")
442-
and file_path.endswith(".mp3")
410+
not input_file.startswith("http")
411+
and input_file.endswith(".mp3")
443412
and "musicbrainzrecordingid" not in tags.tags
444-
and await asyncio.to_thread(os.path.isfile, file_path)
413+
and await asyncio.to_thread(os.path.isfile, input_file)
445414
):
446415
# eyed3 is able to extract the musicbrainzrecordingid from the unique file id
447416
# this is actually a bug in ffmpeg/ffprobe which does not expose this tag
448417
# so we use this as alternative approach for mp3 files
449-
audiofile = await asyncio.to_thread(eyed3.load, file_path)
418+
audiofile = await asyncio.to_thread(eyed3.load, input_file)
450419
if audiofile is not None and audiofile.tag is not None:
451420
for uf_id in audiofile.tag.unique_file_ids:
452421
if uf_id.owner_id == b"http://musicbrainz.org" and uf_id.uniq_id:
453422
tags.tags["musicbrainzrecordingid"] = uf_id.uniq_id.decode()
454423
break
455-
424+
del audiofile
456425
return tags
457426
except (KeyError, ValueError, JSONDecodeError, InvalidDataError) as err:
458-
msg = f"Unable to retrieve info for {file_path}: {err!s}"
427+
msg = f"Unable to retrieve info for {input_file}: {err!s}"
459428
raise InvalidDataError(msg) from err
460-
finally:
461-
if writer_task and not writer_task.done():
462-
writer_task.cancel()
463-
await ffmpeg_proc.close()
464429

465430

466-
async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> bytes | None:
431+
async def get_embedded_image(input_file: str) -> bytes | None:
467432
"""Return embedded image data.
468433
469-
input_file may be a (local) filename/url accessible by ffmpeg or
470-
an AsyncGenerator which yields the file contents as bytes.
434+
Input_file may be a (local) filename or URL accessible by ffmpeg.
471435
"""
472-
file_path = input_file if isinstance(input_file, str) else "-"
473436
args = (
474437
"ffmpeg",
475438
"-hide_banner",
476439
"-loglevel",
477440
"error",
478441
"-i",
479-
file_path,
442+
input_file,
480443
"-an",
481444
"-vcodec",
482445
"mjpeg",
483446
"-f",
484447
"mjpeg",
485448
"-",
486449
)
487-
488-
writer_task: asyncio.Task | None = None
489-
ffmpeg_proc = AsyncProcess(
490-
args, stdin=file_path == "-", stdout=True, stderr=None, name="ffmpeg_image"
491-
)
492-
await ffmpeg_proc.start()
493-
494-
async def writer() -> None:
495-
async for chunk in input_file:
496-
if ffmpeg_proc.closed:
497-
break
498-
await ffmpeg_proc.write(chunk)
499-
await ffmpeg_proc.write_eof()
500-
501-
# feed the file contents to the process stdin
502-
if file_path == "-":
503-
writer_task = asyncio.create_task(writer)
504-
505-
# return image bytes from stdout
506-
try:
507-
return await ffmpeg_proc.read(-1)
508-
finally:
509-
if writer_task and not writer_task.cancelled():
510-
writer_task.cancel()
511-
await ffmpeg_proc.close()
450+
async with AsyncProcess(
451+
args, stdin=False, stdout=True, stderr=None, name="ffmpeg_image"
452+
) as ffmpeg:
453+
return await ffmpeg.read(-1)

music_assistant/server/helpers/util.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,15 +163,29 @@ class TaskManager:
163163
Logging of exceptions is done by the mass.create_task helper.
164164
"""
165165

166-
def __init__(self, mass: MusicAssistant):
166+
def __init__(self, mass: MusicAssistant, limit: int = 0):
167167
"""Initialize the TaskManager."""
168168
self.mass = mass
169169
self._tasks: list[asyncio.Task] = []
170+
self._semaphore = asyncio.Semaphore(limit) if limit else None
170171

171-
def create_task(self, coro: Coroutine) -> None:
172+
def create_task(self, coro: Coroutine) -> asyncio.Task:
172173
"""Create a new task and add it to the manager."""
173174
task = self.mass.create_task(coro)
174175
self._tasks.append(task)
176+
return task
177+
178+
async def create_task_with_limit(self, coro: Coroutine) -> None:
179+
"""Create a new task with semaphore limit."""
180+
assert self._semaphore is not None
181+
182+
def task_done_callback(_task: asyncio.Task) -> None:
183+
self._tasks.remove(task)
184+
self._semaphore.release()
185+
186+
await self._semaphore.acquire()
187+
task: asyncio.Task = self.create_task(coro)
188+
task.add_done_callback(task_done_callback)
175189

176190
async def __aenter__(self) -> Self:
177191
"""Enter context manager."""

0 commit comments

Comments
 (0)