From d01b1d117367119954261d2de333e4ced22c5b63 Mon Sep 17 00:00:00 2001 From: Andrew Sayre <6730289+andrewsayre@users.noreply.github.com> Date: Fri, 10 Jan 2025 23:19:32 -0600 Subject: [PATCH] Combine mixins with command creation (#82) * Move SystemMixin * Move group commands * Move browse commands * Move player mixin --- pyheos/__init__.py | 4 +- pyheos/command/browse.py | 524 +++++++++++---- pyheos/command/connection.py | 27 + pyheos/command/group.py | 215 ++++-- pyheos/command/player.py | 453 +++++++++---- pyheos/command/system.py | 127 +++- pyheos/connection.py | 5 +- pyheos/heos.py | 1222 +--------------------------------- pyheos/options.py | 42 ++ pyheos/player.py | 15 + 10 files changed, 1069 insertions(+), 1565 deletions(-) create mode 100644 pyheos/command/connection.py create mode 100644 pyheos/options.py diff --git a/pyheos/__init__.py b/pyheos/__init__.py index 354f174..ede1ab5 100644 --- a/pyheos/__init__.py +++ b/pyheos/__init__.py @@ -21,7 +21,7 @@ HeosError, ) from .group import HeosGroup -from .heos import Heos, HeosOptions, PlayerUpdateResult +from .heos import Heos from .media import ( AlbumMetadata, BrowseResult, @@ -33,12 +33,14 @@ RetreiveMetadataResult, ServiceOption, ) +from .options import HeosOptions from .player import ( CONTROLS_ALL, CONTROLS_FORWARD_ONLY, CONTROLS_PLAY_STOP, HeosNowPlayingMedia, HeosPlayer, + PlayerUpdateResult, PlayMode, ) from .search import MultiSearchResult, SearchCriteria, SearchResult, SearchStatistic diff --git a/pyheos/command/browse.py b/pyheos/command/browse.py index a36eb8a..92538ec 100644 --- a/pyheos/command/browse.py +++ b/pyheos/command/browse.py @@ -9,10 +9,16 @@ 4.4.18 Get Service Options for now playing screen: OBSOLETE """ -from typing import Any +from collections.abc import Sequence +from typing import TYPE_CHECKING, Any, cast from pyheos import command as c +from pyheos.command.connection import ConnectionMixin from pyheos.const import ( + MUSIC_SOURCE_AUX_INPUT, + MUSIC_SOURCE_FAVORITES, + MUSIC_SOURCE_PLAYLISTS, + SEARCHED_TRACKS, SERVICE_OPTION_ADD_ALBUM_TO_LIBRARY, SERVICE_OPTION_ADD_PLAYLIST_TO_LIBRARY, SERVICE_OPTION_ADD_STATION_TO_LIBRARY, @@ -26,88 +32,193 @@ SERVICE_OPTION_REMOVE_TRACK_FROM_LIBRARY, SERVICE_OPTION_THUMBS_DOWN, SERVICE_OPTION_THUMBS_UP, + VALID_INPUTS, +) +from pyheos.media import ( + BrowseResult, + MediaItem, + MediaMusicSource, + RetreiveMetadataResult, ) from pyheos.message import HeosCommand -from pyheos.types import AddCriteriaType +from pyheos.search import MultiSearchResult, SearchCriteria, SearchResult +from pyheos.types import AddCriteriaType, MediaType + +if TYPE_CHECKING: + from pyheos.heos import Heos + + +class BrowseCommands(ConnectionMixin): + """A mixin to provide access to the browse commands.""" + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Init a new instance of the BrowseMixin.""" + super(BrowseCommands, self).__init__(*args, **kwargs) + + self._music_sources: dict[int, MediaMusicSource] = {} + self._music_sources_loaded = False + + @property + def music_sources(self) -> dict[int, MediaMusicSource]: + """Get available music sources.""" + return self._music_sources + + async def get_music_sources( + self, refresh: bool = False + ) -> dict[int, MediaMusicSource]: + """ + Get available music sources. -class BrowseCommands: - """Define functions for creating browse commands.""" + References: + 4.4.1 Get Music Sources + """ + if not self._music_sources_loaded or refresh: + params = {} + if refresh: + params[c.ATTR_REFRESH] = c.VALUE_ON + message = await self._connection.command( + HeosCommand(c.COMMAND_BROWSE_GET_SOURCES, params) + ) + self._music_sources.clear() + for data in cast(Sequence[dict], message.payload): + source = MediaMusicSource.from_data(data, cast("Heos", self)) + self._music_sources[source.source_id] = source + self._music_sources_loaded = True + return self._music_sources + + async def get_music_source_info( + self, + source_id: int | None = None, + music_source: MediaMusicSource | None = None, + *, + refresh: bool = False, + ) -> MediaMusicSource: + """ + Get information about a specific music source. - @staticmethod - def browse( + References: + 4.4.2 Get Source Info + """ + if source_id is None and music_source is None: + raise ValueError("Either source_id or music_source must be provided") + if source_id is not None and music_source is not None: + raise ValueError("Only one of source_id or music_source should be provided") + + # if only source_id provided, try getting from loaded + if music_source is None: + assert source_id is not None + music_source = self._music_sources.get(source_id) + else: + source_id = music_source.source_id + + if music_source is None or refresh: + # Get the latest information + result = await self._connection.command( + HeosCommand( + c.COMMAND_BROWSE_GET_SOURCE_INFO, {c.ATTR_SOURCE_ID: source_id} + ) + ) + payload = cast(dict[str, Any], result.payload) + if music_source is None: + music_source = MediaMusicSource.from_data(payload, cast("Heos", self)) + else: + music_source._update_from_data(payload) + return music_source + + async def browse( + self, source_id: int, container_id: str | None = None, range_start: int | None = None, range_end: int | None = None, - ) -> HeosCommand: - """Create a HEOS command to browse the provided source. + ) -> BrowseResult: + """Browse the contents of the specified source or container. References: 4.4.3 Browse Source 4.4.4 Browse Source Containers 4.4.13 Get HEOS Playlists 4.4.16 Get HEOS History + + Args: + source_id: The identifier of the source to browse. + container_id: The identifier of the container to browse. If not provided, the root of the source will be expanded. + range_start: The index of the first item to return. Both range_start and range_end must be provided to return a range of items. + range_end: The index of the last item to return. Both range_start and range_end must be provided to return a range of items. + Returns: + A BrowseResult instance containing the items in the source or container. """ params: dict[str, Any] = {c.ATTR_SOURCE_ID: source_id} if container_id: params[c.ATTR_CONTAINER_ID] = container_id if isinstance(range_start, int) and isinstance(range_end, int): params[c.ATTR_RANGE] = f"{range_start},{range_end}" - return HeosCommand(c.COMMAND_BROWSE_BROWSE, params) + message = await self._connection.command( + HeosCommand(c.COMMAND_BROWSE_BROWSE, params) + ) + return BrowseResult._from_message(message, cast("Heos", self)) - @staticmethod - def get_music_sources(refresh: bool = False) -> HeosCommand: - """ - Create a HEOS command to get the music sources. + async def browse_media( + self, + media: MediaItem | MediaMusicSource, + range_start: int | None = None, + range_end: int | None = None, + ) -> BrowseResult: + """Browse the contents of the specified media item. References: - 4.4.1 Get Music Sources - """ - params = {} - if refresh: - params[c.ATTR_REFRESH] = c.VALUE_ON - return HeosCommand(c.COMMAND_BROWSE_GET_SOURCES, params) - - @staticmethod - def get_music_source_info(source_id: int) -> HeosCommand: - """ - Create a HEOS command to get information about a music source. + 4.4.3 Browse Source + 4.4.4 Browse Source Containers + 4.4.13 Get HEOS Playlists + 4.4.16 Get HEOS History - References: - 4.4.2 Get Source Info + Args: + media: The media item to browse, must be of type MediaItem or MediaMusicSource. + range_start: The index of the first item to return. Both range_start and range_end must be provided to return a range of items. + range_end: The index of the last item to return. Both range_start and range_end must be provided to return a range of items. + Returns: + A BrowseResult instance containing the items in the media item. """ - return HeosCommand( - c.COMMAND_BROWSE_GET_SOURCE_INFO, {c.ATTR_SOURCE_ID: source_id} - ) + if isinstance(media, MediaMusicSource): + if not media.available: + raise ValueError("Source is not available to browse") + return await self.browse(media.source_id) + else: + if not media.browsable: + raise ValueError("Only media sources and containers can be browsed") + return await self.browse( + media.source_id, media.container_id, range_start, range_end + ) - @staticmethod - def get_search_criteria(source_id: int) -> HeosCommand: + async def get_search_criteria(self, source_id: int) -> list[SearchCriteria]: """ Create a HEOS command to get the search criteria. References: 4.4.5 Get Search Criteria """ - return HeosCommand( - c.COMMAND_BROWSE_GET_SEARCH_CRITERIA, - {c.ATTR_SOURCE_ID: source_id}, + result = await self._connection.command( + HeosCommand( + c.COMMAND_BROWSE_GET_SEARCH_CRITERIA, + {c.ATTR_SOURCE_ID: source_id}, + ) ) + payload = cast(list[dict[str, str]], result.payload) + return [SearchCriteria._from_data(data) for data in payload] - @staticmethod - def search( + async def search( + self, source_id: int, search: str, criteria_id: int, range_start: int | None = None, range_end: int | None = None, - ) -> HeosCommand: + ) -> SearchResult: """ Create a HEOS command to search for media. References: - 4.4.6 Search - """ + 4.4.6 Search""" if search == "": raise ValueError("'search' parameter must not be empty") if len(search) > 128: @@ -121,22 +232,47 @@ def search( } if isinstance(range_start, int) and isinstance(range_end, int): params[c.ATTR_RANGE] = f"{range_start},{range_end}" - return HeosCommand(c.COMMAND_BROWSE_SEARCH, params) + result = await self._connection.command( + HeosCommand(c.COMMAND_BROWSE_SEARCH, params) + ) + return SearchResult._from_message(result, cast("Heos", self)) - @staticmethod - def play_station( - player_id: int, - source_id: int, - container_id: str | None, - media_id: str, - ) -> HeosCommand: + async def play_input_source( + self, player_id: int, input_name: str, source_player_id: int | None = None + ) -> None: + """ + Play the specified input source on the specified player. + + References: + 4.4.9 Play Input Source + + Args: + player_id: The identifier of the player to play the input source. + input: The input source to play. + source_player_id: The identifier of the player that has the input source, if different than the player_id. """ - Create a HEOS command to play a station. + params = { + c.ATTR_PLAYER_ID: player_id, + c.ATTR_INPUT: input_name, + } + if source_player_id is not None: + params[c.ATTR_SOURCE_PLAYER_ID] = source_player_id + await self._connection.command(HeosCommand(c.COMMAND_BROWSE_PLAY_INPUT, params)) + + async def play_station( + self, player_id: int, source_id: int, container_id: str | None, media_id: str + ) -> None: + """ + Play the specified station on the specified player. References: 4.4.7 Play Station - Note: Parameters 'cid' and 'name' do not appear to be required in testing, however send 'cid' if provided. + Args: + player_id: The identifier of the player to play the station. + source_id: The identifier of the source containing the station. + container_id: The identifier of the container containing the station. + media_id: The identifier of the station to play. """ params = { c.ATTR_PLAYER_ID: player_id, @@ -145,68 +281,69 @@ def play_station( } if container_id is not None: params[c.ATTR_CONTAINER_ID] = container_id - return HeosCommand(c.COMMAND_BROWSE_PLAY_STREAM, params) + await self._connection.command( + HeosCommand(c.COMMAND_BROWSE_PLAY_STREAM, params) + ) - @staticmethod - def play_preset_station(player_id: int, preset: int) -> HeosCommand: + async def play_preset_station(self, player_id: int, index: int) -> None: """ - Create a HEOS command to play a preset station. + Play the preset station on the specified player (favorite) References: 4.4.8 Play Preset Station - """ - if preset < 1: - raise ValueError(f"Invalid preset: {preset}") - return HeosCommand( - c.COMMAND_BROWSE_PLAY_PRESET, - {c.ATTR_PLAYER_ID: player_id, c.ATTR_PRESET: preset}, - ) - @staticmethod - def play_input_source( - player_id: int, input_name: str, source_player_id: int | None = None - ) -> HeosCommand: + Args: + player_id: The identifier of the player to play the preset station. + index: The index of the preset station to play. """ - Create a HEOS command to play the specified input source. - - References: - 4.4.9 Play Input Source - """ - params = { - c.ATTR_PLAYER_ID: player_id, - c.ATTR_INPUT: input_name, - } - if source_player_id is not None: - params[c.ATTR_SOURCE_PLAYER_ID] = source_player_id - return HeosCommand(c.COMMAND_BROWSE_PLAY_INPUT, params) + if index < 1: + raise ValueError(f"Invalid preset: {index}") + await self._connection.command( + HeosCommand( + c.COMMAND_BROWSE_PLAY_PRESET, + {c.ATTR_PLAYER_ID: player_id, c.ATTR_PRESET: index}, + ) + ) - @staticmethod - def play_url(player_id: int, url: str) -> HeosCommand: + async def play_url(self, player_id: int, url: str) -> None: """ - Create a HEOS command to play the specified URL. + Play the specified URL on the specified player. References: 4.4.10 Play URL + + Args: + player_id: The identifier of the player to play the URL. + url: The URL to play. """ - return HeosCommand( - c.COMMAND_BROWSE_PLAY_STREAM, - {c.ATTR_PLAYER_ID: player_id, c.ATTR_URL: url}, + await self._connection.command( + HeosCommand( + c.COMMAND_BROWSE_PLAY_STREAM, + {c.ATTR_PLAYER_ID: player_id, c.ATTR_URL: url}, + ) ) - @staticmethod - def add_to_queue( + async def add_to_queue( + self, player_id: int, source_id: int, container_id: str, media_id: str | None = None, add_criteria: AddCriteriaType = AddCriteriaType.PLAY_NOW, - ) -> HeosCommand: + ) -> None: """ - Create a HEOS command to add the specified media to the queue. + Add the specified media item to the queue of the specified player. References: 4.4.11 Add Container to Queue with Options 4.4.12 Add Track to Queue with Options + + Args: + player_id: The identifier of the player to add the media item. + source_id: The identifier of the source containing the media item. + container_id: The identifier of the container containing the media item. + media_id: The identifier of the media item to add. Required for MediaType.Song. + add_criteria: Determines how tracks are added to the queue. The default is AddCriteriaType.PLAY_NOW. """ params = { c.ATTR_PLAYER_ID: player_id, @@ -216,14 +353,42 @@ def add_to_queue( } if media_id is not None: params[c.ATTR_MEDIA_ID] = media_id - return HeosCommand(c.COMMAND_BROWSE_ADD_TO_QUEUE, params) + await self._connection.command( + HeosCommand(c.COMMAND_BROWSE_ADD_TO_QUEUE, params) + ) - @staticmethod - def rename_playlist( - source_id: int, container_id: str, new_name: str - ) -> HeosCommand: + async def add_search_to_queue( + self, + player_id: int, + source_id: int, + search: str, + criteria_container_id: str = SEARCHED_TRACKS, + add_criteria: AddCriteriaType = AddCriteriaType.PLAY_NOW, + ) -> None: + """Add searched tracks to the queue of the specified player. + + References: + 4.4.11 Add Container to Queue with Options + + Args: + player_id: The identifier of the player to add the search results. + source_id: The identifier of the source to search. + search: The search string. + criteria_container_id: the criteria container id prefix. + add_criteria: Determines how tracks are added to the queue. The default is AddCriteriaType.PLAY_NOW. """ - Create a HEOS command to rename a playlist. + await self.add_to_queue( + player_id=player_id, + source_id=source_id, + container_id=f"{criteria_container_id}{search}", + add_criteria=add_criteria, + ) + + async def rename_playlist( + self, source_id: int, container_id: str, new_name: str + ) -> None: + """ + Rename a HEOS playlist. References: 4.4.14 Rename HEOS Playlist @@ -234,58 +399,66 @@ def rename_playlist( raise ValueError( "'new_name' parameter must be less than or equal to 128 characters" ) - return HeosCommand( - c.COMMAND_BROWSE_RENAME_PLAYLIST, - { - c.ATTR_SOURCE_ID: source_id, - c.ATTR_CONTAINER_ID: container_id, - c.ATTR_NAME: new_name, - }, + await self._connection.command( + HeosCommand( + c.COMMAND_BROWSE_RENAME_PLAYLIST, + { + c.ATTR_SOURCE_ID: source_id, + c.ATTR_CONTAINER_ID: container_id, + c.ATTR_NAME: new_name, + }, + ) ) - @staticmethod - def delete_playlist(source_id: int, container_id: str) -> HeosCommand: + async def delete_playlist(self, source_id: int, container_id: str) -> None: """ Create a HEOS command to delete a playlist. References: 4.4.15 Delete HEOS Playlist""" - return HeosCommand( - c.COMMAND_BROWSE_DELETE__PLAYLIST, - { - c.ATTR_SOURCE_ID: source_id, - c.ATTR_CONTAINER_ID: container_id, - }, + + await self._connection.command( + HeosCommand( + c.COMMAND_BROWSE_DELETE__PLAYLIST, + { + c.ATTR_SOURCE_ID: source_id, + c.ATTR_CONTAINER_ID: container_id, + }, + ) ) - @staticmethod - def retrieve_metadata(source_it: int, container_id: str) -> HeosCommand: + async def retrieve_metadata( + self, source_it: int, container_id: str + ) -> RetreiveMetadataResult: """ - Create a HEOS command to retrieve metadata. + Create a HEOS command to retrieve metadata. Only supported by Rhapsody/Napster music sources. References: 4.4.17 Retrieve Metadata """ - return HeosCommand( - c.COMMAND_BROWSE_RETRIEVE_METADATA, - { - c.ATTR_SOURCE_ID: source_it, - c.ATTR_CONTAINER_ID: container_id, - }, + result = await self._connection.command( + HeosCommand( + c.COMMAND_BROWSE_RETRIEVE_METADATA, + { + c.ATTR_SOURCE_ID: source_it, + c.ATTR_CONTAINER_ID: container_id, + }, + ) ) + return RetreiveMetadataResult._from_message(result) - @staticmethod - def set_service_option( + async def set_service_option( + this, option_id: int, - source_id: int | None, - container_id: str | None, - media_id: str | None, - player_id: int | None, - name: str | None, - criteria_id: int | None, + source_id: int | None = None, + container_id: str | None = None, + media_id: str | None = None, + player_id: int | None = None, + name: str | None = None, + criteria_id: int | None = None, range_start: int | None = None, range_end: int | None = None, - ) -> HeosCommand: + ) -> None: """ Create a HEOS command to set a service option. @@ -429,13 +602,97 @@ def set_service_option( f"{', '.join(disallowed_params.keys())} parameters are not allowed for service option_id {option_id}" ) - # return the command - return HeosCommand(c.COMMAND_BROWSE_SET_SERVICE_OPTION, params) + await this._connection.command( + HeosCommand(c.COMMAND_BROWSE_SET_SERVICE_OPTION, params) + ) + + async def play_media( + self, + player_id: int, + media: MediaItem, + add_criteria: AddCriteriaType = AddCriteriaType.PLAY_NOW, + ) -> None: + """ + Play the specified media item on the specified player. + + Args: + player_id: The identifier of the player to play the media item. + media: The media item to play. + add_criteria: Determines how containers or tracks are added to the queue. The default is AddCriteriaType.PLAY_NOW. + """ + if not media.playable: + raise ValueError(f"Media '{media}' is not playable") + + if media.media_id in VALID_INPUTS: + await self.play_input_source(player_id, media.media_id, media.source_id) + elif media.type == MediaType.STATION: + if media.media_id is None: + raise ValueError(f"'Media '{media}' cannot have a None media_id") + await self.play_station( + player_id=player_id, + source_id=media.source_id, + container_id=media.container_id, + media_id=media.media_id, + ) + else: + # Handles both songs and containers + if media.container_id is None: + raise ValueError(f"Media '{media}' cannot have a None container_id") + await self.add_to_queue( + player_id=player_id, + source_id=media.source_id, + container_id=media.container_id, + media_id=media.media_id, + add_criteria=add_criteria, + ) + + async def get_input_sources(self) -> Sequence[MediaItem]: + """ + Get available input sources. + + This will browse all aux input sources and return a list of all available input sources. - @staticmethod - def multi_search( - search: str, source_ids: list[int] | None, criteria_ids: list[int] | None - ) -> HeosCommand: + Returns: + A sequence of MediaItem instances representing the available input sources across all aux input sources. + """ + result = await self.browse(MUSIC_SOURCE_AUX_INPUT) + input_sources: list[MediaItem] = [] + for item in result.items: + source_browse_result = await item.browse() + input_sources.extend(source_browse_result.items) + + return input_sources + + async def get_favorites(self) -> dict[int, MediaItem]: + """ + Get available favorites. + + This will browse the favorites music source and return a dictionary of all available favorites. + + Returns: + A dictionary with keys representing the index (1-based) of the favorite and the value being the MediaItem instance. + """ + result = await self.browse(MUSIC_SOURCE_FAVORITES) + return {index + 1: source for index, source in enumerate(result.items)} + + async def get_playlists(self) -> Sequence[MediaItem]: + """ + Get available playlists. + + This will browse the playlists music source and return a list of all available playlists. + + Returns: + A sequence of MediaItem instances representing the available playlists. + """ + result = await self.browse(MUSIC_SOURCE_PLAYLISTS) + return result.items + + async def multi_search( + self, + search: str, + source_ids: list[int] | None = None, + criteria_ids: list[int] | None = None, + ) -> MultiSearchResult: """ Create a HEOS command to perform a multi-search. @@ -451,4 +708,7 @@ def multi_search( params[c.ATTR_SOURCE_ID] = ",".join(map(str, source_ids)) if criteria_ids is not None: params[c.ATTR_SEARCH_CRITERIA_ID] = ",".join(map(str, criteria_ids)) - return HeosCommand(c.COMMAND_BROWSE_MULTI_SEARCH, params) + result = await self._connection.command( + HeosCommand(c.COMMAND_BROWSE_MULTI_SEARCH, params) + ) + return MultiSearchResult._from_message(result, cast("Heos", self)) diff --git a/pyheos/command/connection.py b/pyheos/command/connection.py new file mode 100644 index 0000000..e78bda7 --- /dev/null +++ b/pyheos/command/connection.py @@ -0,0 +1,27 @@ +"""Define the connection mixin module.""" + +from pyheos.connection import AutoReconnectingConnection +from pyheos.options import HeosOptions +from pyheos.types import ConnectionState + + +class ConnectionMixin: + "A mixin to provide access to the connection." + + def __init__(self, options: HeosOptions) -> None: + """Init a new instance of the ConnectionMixin.""" + self._options = options + self._connection = AutoReconnectingConnection( + options.host, + timeout=options.timeout, + reconnect=options.auto_reconnect, + reconnect_delay=options.auto_reconnect_delay, + reconnect_max_attempts=options.auto_reconnect_max_attempts, + heart_beat=options.heart_beat, + heart_beat_interval=options.heart_beat_interval, + ) + + @property + def connection_state(self) -> ConnectionState: + """Get the state of the connection.""" + return self._connection.state diff --git a/pyheos/command/group.py b/pyheos/command/group.py index 81a0ce1..bfb4b71 100644 --- a/pyheos/command/group.py +++ b/pyheos/command/group.py @@ -4,117 +4,236 @@ This module creates HEOS group commands. """ +import asyncio from collections.abc import Sequence +from typing import TYPE_CHECKING, Any, cast from pyheos import command as c +from pyheos.command.connection import ConnectionMixin +from pyheos.const import DEFAULT_STEP +from pyheos.group import HeosGroup from pyheos.message import HeosCommand +if TYPE_CHECKING: + from pyheos.heos import Heos -class GroupCommands: - """Define functions for creating group commands.""" - @staticmethod - def get_groups() -> HeosCommand: - """Create a get groups c. +class GroupCommands(ConnectionMixin): + """A mixin to provide access to the group commands.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Init a new instance of the BrowseMixin.""" + super(GroupCommands, self).__init__(*args, **kwargs) + self._groups: dict[int, HeosGroup] = {} + self._groups_loaded = False + + @property + def groups(self) -> dict[int, HeosGroup]: + """Get the loaded groups.""" + return self._groups + + async def get_groups(self, *, refresh: bool = False) -> dict[int, HeosGroup]: + """Get available groups. References: 4.3.1 Get Groups""" - return HeosCommand(c.COMMAND_GET_GROUPS) - - @staticmethod - def get_group_info(group_id: int) -> HeosCommand: + if not self._groups_loaded or refresh: + groups = {} + result = await self._connection.command(HeosCommand(c.COMMAND_GET_GROUPS)) + payload = cast(Sequence[dict], result.payload) + for data in payload: + group = HeosGroup._from_data(data, cast("Heos", self)) + groups[group.group_id] = group + self._groups = groups + # Update all statuses + await asyncio.gather( + *[ + group.refresh(refresh_base_info=False) + for group in self._groups.values() + ] + ) + self._groups_loaded = True + return self._groups + + async def get_group_info( + self, + group_id: int | None = None, + group: HeosGroup | None = None, + *, + refresh: bool = False, + ) -> HeosGroup: """Get information about a group. + Only one of group_id or group should be provided. + + Args: + group_id: The identifier of the group to get information about. Only one of group_id or group should be provided. + group: The HeosGroup instance to update with the latest information. Only one of group_id or group should be provided. + refresh: Set to True to force a refresh of the group information. + References: 4.3.2 Get Group Info""" - return HeosCommand(c.COMMAND_GET_GROUP_INFO, {c.ATTR_GROUP_ID: group_id}) - - @staticmethod - def set_group(player_ids: Sequence[int]) -> HeosCommand: + if group_id is None and group is None: + raise ValueError("Either group_id or group must be provided") + if group_id is not None and group is not None: + raise ValueError("Only one of group_id or group should be provided") + + # if only group_id provided, try getting from loaded + if group is None: + assert group_id is not None + group = self._groups.get(group_id) + else: + group_id = group.group_id + + if group is None or refresh: + # Get the latest information + result = await self._connection.command( + HeosCommand(c.COMMAND_GET_GROUP_INFO, {c.ATTR_GROUP_ID: group_id}) + ) + payload = cast(dict[str, Any], result.payload) + if group is None: + group = HeosGroup._from_data(payload, cast("Heos", self)) + else: + group._update_from_data(payload) + await group.refresh(refresh_base_info=False) + return group + + async def set_group(self, player_ids: Sequence[int]) -> None: """Create, modify, or ungroup players. + Args: + player_ids: The list of player identifiers to group or ungroup. The first player is the group leader. + References: 4.3.3 Set Group""" - return HeosCommand( - c.COMMAND_SET_GROUP, - {c.ATTR_PLAYER_ID: ",".join(map(str, player_ids))}, + await self._connection.command( + HeosCommand( + c.COMMAND_SET_GROUP, + {c.ATTR_PLAYER_ID: ",".join(map(str, player_ids))}, + ) ) - @staticmethod - def get_group_volume(group_id: int) -> HeosCommand: + async def create_group( + self, leader_player_id: int, member_player_ids: Sequence[int] + ) -> None: + """Create a HEOS group. + + Args: + leader_player_id: The player_id of the lead player in the group. + member_player_ids: The player_ids of the group members. + + References: + 4.3.3 Set Group""" + player_ids = [leader_player_id] + player_ids.extend(member_player_ids) + await self.set_group(player_ids) + + async def remove_group(self, group_id: int) -> None: + """Ungroup the specified group. + + Args: + group_id: The identifier of the group to ungroup. Must be the lead player. + + References: + 4.3.3 Set Group + """ + await self.set_group([group_id]) + + async def update_group( + self, group_id: int, member_player_ids: Sequence[int] + ) -> None: + """Update the membership of a group. + + Args: + group_id: The identifier of the group to update (same as the lead player_id) + member_player_ids: The new player_ids of the group members. + """ + await self.create_group(group_id, member_player_ids) + + async def get_group_volume(self, group_id: int) -> int: """ Get the volume of a group. References: 4.3.4 Get Group Volume """ - return HeosCommand(c.COMMAND_GET_GROUP_VOLUME, {c.ATTR_GROUP_ID: group_id}) + result = await self._connection.command( + HeosCommand(c.COMMAND_GET_GROUP_VOLUME, {c.ATTR_GROUP_ID: group_id}) + ) + return result.get_message_value_int(c.ATTR_LEVEL) - @staticmethod - def set_group_volume(group_id: int, level: int) -> HeosCommand: + async def set_group_volume(self, group_id: int, level: int) -> None: """Set the volume of the group. References: 4.3.5 Set Group Volume""" if level < 0 or level > 100: raise ValueError("'level' must be in the range 0-100") - return HeosCommand( - c.COMMAND_SET_GROUP_VOLUME, - {c.ATTR_GROUP_ID: group_id, c.ATTR_LEVEL: level}, + await self._connection.command( + HeosCommand( + c.COMMAND_SET_GROUP_VOLUME, + {c.ATTR_GROUP_ID: group_id, c.ATTR_LEVEL: level}, + ) ) - @staticmethod - def group_volume_up(group_id: int, step: int) -> HeosCommand: + async def group_volume_up(self, group_id: int, step: int = DEFAULT_STEP) -> None: """Increase the volume level. References: 4.3.6 Group Volume Up""" if step < 1 or step > 10: raise ValueError("'step' must be in the range 1-10") - return HeosCommand( - c.COMMAND_GROUP_VOLUME_UP, - {c.ATTR_GROUP_ID: group_id, c.ATTR_STEP: step}, + await self._connection.command( + HeosCommand( + c.COMMAND_GROUP_VOLUME_UP, + {c.ATTR_GROUP_ID: group_id, c.ATTR_STEP: step}, + ) ) - @staticmethod - def group_volume_down(group_id: int, step: int) -> HeosCommand: + async def group_volume_down(self, group_id: int, step: int = DEFAULT_STEP) -> None: """Increase the volume level. References: 4.2.7 Group Volume Down""" if step < 1 or step > 10: raise ValueError("'step' must be in the range 1-10") - return HeosCommand( - c.COMMAND_GROUP_VOLUME_DOWN, - {c.ATTR_GROUP_ID: group_id, c.ATTR_STEP: step}, + await self._connection.command( + HeosCommand( + c.COMMAND_GROUP_VOLUME_DOWN, + {c.ATTR_GROUP_ID: group_id, c.ATTR_STEP: step}, + ) ) - @staticmethod - def get_group_mute(group_id: int) -> HeosCommand: + async def get_group_mute(self, group_id: int) -> bool: """Get the mute status of the group. References: 4.3.8 Get Group Mute""" - return HeosCommand(c.COMMAND_GET_GROUP_MUTE, {c.ATTR_GROUP_ID: group_id}) + result = await self._connection.command( + HeosCommand(c.COMMAND_GET_GROUP_MUTE, {c.ATTR_GROUP_ID: group_id}) + ) + return result.get_message_value(c.ATTR_STATE) == c.VALUE_ON - @staticmethod - def group_set_mute(group_id: int, state: bool) -> HeosCommand: + async def group_set_mute(self, group_id: int, state: bool) -> None: """Set the mute state of the group. References: 4.3.9 Set Group Mute""" - return HeosCommand( - c.COMMAND_SET_GROUP_MUTE, - { - c.ATTR_GROUP_ID: group_id, - c.ATTR_STATE: c.VALUE_ON if state else c.VALUE_OFF, - }, + await self._connection.command( + HeosCommand( + c.COMMAND_SET_GROUP_MUTE, + { + c.ATTR_GROUP_ID: group_id, + c.ATTR_STATE: c.VALUE_ON if state else c.VALUE_OFF, + }, + ) ) - @staticmethod - def group_toggle_mute(group_id: int) -> HeosCommand: + async def group_toggle_mute(self, group_id: int) -> None: """Toggle the mute state. References: 4.3.10 Toggle Group Mute""" - return HeosCommand(c.COMMAND_GROUP_TOGGLE_MUTE, {c.ATTR_GROUP_ID: group_id}) + await self._connection.command( + HeosCommand(c.COMMAND_GROUP_TOGGLE_MUTE, {c.ATTR_GROUP_ID: group_id}) + ) diff --git a/pyheos/command/player.py b/pyheos/command/player.py index ac75ce5..badc74e 100644 --- a/pyheos/command/player.py +++ b/pyheos/command/player.py @@ -4,169 +4,322 @@ This module creates HEOS player commands. """ -from typing import Any +import asyncio +from collections.abc import Sequence +from typing import TYPE_CHECKING, Any, cast from pyheos import command as c -from pyheos.const import DEFAULT_STEP +from pyheos import const +from pyheos.command.connection import ConnectionMixin +from pyheos.media import QueueItem from pyheos.message import HeosCommand -from pyheos.player import PlayState +from pyheos.player import ( + HeosNowPlayingMedia, + HeosPlayer, + PlayerUpdateResult, + PlayMode, + PlayState, +) from pyheos.types import RepeatType +if TYPE_CHECKING: + from pyheos.heos import Heos -class PlayerCommands: - """Define functions for creating player commands.""" - @staticmethod - def get_players() -> HeosCommand: - """ - Get players. +class PlayerCommands(ConnectionMixin): + """A mixin to provide access to the player commands.""" - References: - 4.2.1 Get Players - """ - return HeosCommand(c.COMMAND_GET_PLAYERS) + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Init a new instance of the BrowseMixin.""" + super(PlayerCommands, self).__init__(*args, **kwargs) - @staticmethod - def get_player_info(player_id: int) -> HeosCommand: - """Get player information. + self._players: dict[int, HeosPlayer] = {} + self._players_loaded = False + + @property + def players(self) -> dict[int, HeosPlayer]: + """Get the loaded players.""" + return self._players + + async def get_players(self, *, refresh: bool = False) -> dict[int, HeosPlayer]: + """Get available players. + + References: + 4.2.1 Get Players""" + # get players and pull initial state + if not self._players_loaded or refresh: + await self.load_players() + return self._players + + async def get_player_info( + self, + player_id: int | None = None, + player: HeosPlayer | None = None, + *, + refresh: bool = False, + ) -> HeosPlayer: + """Get information about a player. + + Only one of player_id or player should be provided. + + Args: + palyer_id: The identifier of the group to get information about. Only one of player_id or player should be provided. + player: The HeosPlayer instance to update with the latest information. Only one of player_id or player should be provided. + refresh: Set to True to force a refresh of the group information. + Returns: + A HeosPlayer instance containing the player information. References: 4.2.2 Get Player Info""" - return HeosCommand(c.COMMAND_GET_PLAYER_INFO, {c.ATTR_PLAYER_ID: player_id}) + if player_id is None and player is None: + raise ValueError("Either player_id or player must be provided") + if player_id is not None and player is not None: + raise ValueError("Only one of player_id or player should be provided") + + # if only palyer_id provided, try getting from loaded + if player is None: + assert player_id is not None + player = self._players.get(player_id) + else: + player_id = player.player_id + + if player is None or refresh: + # Get the latest information + result = await self._connection.command( + HeosCommand(c.COMMAND_GET_PLAYER_INFO, {c.ATTR_PLAYER_ID: player_id}) + ) + + payload = cast(dict[str, Any], result.payload) + if player is None: + player = HeosPlayer._from_data(payload, cast("Heos", self)) + else: + player._update_from_data(payload) + await player.refresh(refresh_base_info=False) + return player + + async def load_players(self) -> PlayerUpdateResult: + """Refresh the players.""" + result = PlayerUpdateResult() + + players: dict[int, HeosPlayer] = {} + response = await self._connection.command(HeosCommand(c.COMMAND_GET_PLAYERS)) + payload = cast(Sequence[dict], response.payload) + existing = list(self._players.values()) + for player_data in payload: + player_id = int(player_data[c.ATTR_PLAYER_ID]) + name = player_data[c.ATTR_NAME] + version = player_data[c.ATTR_VERSION] + serial = player_data.get(c.ATTR_SERIAL) + # Try matching by serial (if available), then try matching by player_id + # and fallback to matching name when firmware version is different + player = next( + ( + player + for player in existing + if (player.serial == serial and serial is not None) + or player.player_id == player_id + or (player.name == name and player.version != version) + ), + None, + ) + if player: + # Found existing, update + if player.player_id != player_id: + result.updated_player_ids[player.player_id] = player_id + player._update_from_data(player_data) + player.available = True + players[player_id] = player + existing.remove(player) + else: + # New player + player = HeosPlayer._from_data(player_data, cast("Heos", self)) + result.added_player_ids.append(player_id) + players[player_id] = player + # For any item remaining in existing, mark unavailalbe, add to updated + for player in existing: + result.removed_player_ids.append(player.player_id) + player.available = False + players[player.player_id] = player + + # Pull data for available players + await asyncio.gather( + *[ + player.refresh(refresh_base_info=False) + for player in players.values() + if player.available + ] + ) + self._players = players + self._players_loaded = True + return result - @staticmethod - def get_play_state(player_id: int) -> HeosCommand: + async def player_get_play_state(self, player_id: int) -> PlayState: """Get the state of the player. References: 4.2.3 Get Play State""" - return HeosCommand(c.COMMAND_GET_PLAY_STATE, {c.ATTR_PLAYER_ID: player_id}) + response = await self._connection.command( + HeosCommand(c.COMMAND_GET_PLAY_STATE, {c.ATTR_PLAYER_ID: player_id}) + ) + return PlayState(response.get_message_value(c.ATTR_STATE)) - @staticmethod - def set_play_state(player_id: int, state: PlayState) -> HeosCommand: + async def player_set_play_state(self, player_id: int, state: PlayState) -> None: """Set the state of the player. References: 4.2.4 Set Play State""" - return HeosCommand( - c.COMMAND_SET_PLAY_STATE, - {c.ATTR_PLAYER_ID: player_id, c.ATTR_STATE: state}, + await self._connection.command( + HeosCommand( + c.COMMAND_SET_PLAY_STATE, + {c.ATTR_PLAYER_ID: player_id, c.ATTR_STATE: state}, + ) ) - @staticmethod - def get_now_playing_media(player_id: int) -> HeosCommand: + async def get_now_playing_media( + self, player_id: int, update: HeosNowPlayingMedia | None = None + ) -> HeosNowPlayingMedia: """Get the now playing media information. + Args: + player_id: The identifier of the player to get the now playing media. + update: The current now playing media information to update. If not provided, a new instance will be created. + + Returns: + A HeosNowPlayingMedia instance containing the now playing media information. + References: 4.2.5 Get Now Playing Media""" - return HeosCommand( - c.COMMAND_GET_NOW_PLAYING_MEDIA, {c.ATTR_PLAYER_ID: player_id} + result = await self._connection.command( + HeosCommand(c.COMMAND_GET_NOW_PLAYING_MEDIA, {c.ATTR_PLAYER_ID: player_id}) ) + instance = update or HeosNowPlayingMedia() + instance._update_from_message(result) + return instance - @staticmethod - def get_volume(player_id: int) -> HeosCommand: + async def player_get_volume(self, player_id: int) -> int: """Get the volume level of the player. References: 4.2.6 Get Volume""" - return HeosCommand(c.COMMAND_GET_VOLUME, {c.ATTR_PLAYER_ID: player_id}) + result = await self._connection.command( + HeosCommand(c.COMMAND_GET_VOLUME, {c.ATTR_PLAYER_ID: player_id}) + ) + return result.get_message_value_int(c.ATTR_LEVEL) - @staticmethod - def set_volume(player_id: int, level: int) -> HeosCommand: + async def player_set_volume(self, player_id: int, level: int) -> None: """Set the volume of the player. References: 4.2.7 Set Volume""" if level < 0 or level > 100: raise ValueError("'level' must be in the range 0-100") - return HeosCommand( - c.COMMAND_SET_VOLUME, - {c.ATTR_PLAYER_ID: player_id, c.ATTR_LEVEL: level}, + await self._connection.command( + HeosCommand( + c.COMMAND_SET_VOLUME, + {c.ATTR_PLAYER_ID: player_id, c.ATTR_LEVEL: level}, + ) ) - @staticmethod - def volume_up(player_id: int, step: int = DEFAULT_STEP) -> HeosCommand: + async def player_volume_up( + self, player_id: int, step: int = const.DEFAULT_STEP + ) -> None: """Increase the volume level. References: 4.2.8 Volume Up""" if step < 1 or step > 10: raise ValueError("'step' must be in the range 1-10") - return HeosCommand( - c.COMMAND_VOLUME_UP, - {c.ATTR_PLAYER_ID: player_id, c.ATTR_STEP: step}, + await self._connection.command( + HeosCommand( + c.COMMAND_VOLUME_UP, + {c.ATTR_PLAYER_ID: player_id, c.ATTR_STEP: step}, + ) ) - @staticmethod - def volume_down(player_id: int, step: int = DEFAULT_STEP) -> HeosCommand: + async def player_volume_down( + self, player_id: int, step: int = const.DEFAULT_STEP + ) -> None: """Increase the volume level. References: 4.2.9 Volume Down""" if step < 1 or step > 10: raise ValueError("'step' must be in the range 1-10") - return HeosCommand( - c.COMMAND_VOLUME_DOWN, - {c.ATTR_PLAYER_ID: player_id, c.ATTR_STEP: step}, + await self._connection.command( + HeosCommand( + c.COMMAND_VOLUME_DOWN, + {c.ATTR_PLAYER_ID: player_id, c.ATTR_STEP: step}, + ) ) - @staticmethod - def get_mute(player_id: int) -> HeosCommand: + async def player_get_mute(self, player_id: int) -> bool: """Get the mute state of the player. References: 4.2.10 Get Mute""" - return HeosCommand(c.COMMAND_GET_MUTE, {c.ATTR_PLAYER_ID: player_id}) + result = await self._connection.command( + HeosCommand(c.COMMAND_GET_MUTE, {c.ATTR_PLAYER_ID: player_id}) + ) + return result.get_message_value(c.ATTR_STATE) == c.VALUE_ON - @staticmethod - def set_mute(player_id: int, state: bool) -> HeosCommand: + async def player_set_mute(self, player_id: int, state: bool) -> None: """Set the mute state of the player. References: 4.2.11 Set Mute""" - return HeosCommand( - c.COMMAND_SET_MUTE, - { - c.ATTR_PLAYER_ID: player_id, - c.ATTR_STATE: c.VALUE_ON if state else c.VALUE_OFF, - }, + await self._connection.command( + HeosCommand( + c.COMMAND_SET_MUTE, + { + c.ATTR_PLAYER_ID: player_id, + c.ATTR_STATE: c.VALUE_ON if state else c.VALUE_OFF, + }, + ) ) - @staticmethod - def toggle_mute(player_id: int) -> HeosCommand: + async def player_toggle_mute(self, player_id: int) -> None: """Toggle the mute state. References: 4.2.12 Toggle Mute""" - return HeosCommand(c.COMMAND_TOGGLE_MUTE, {c.ATTR_PLAYER_ID: player_id}) + await self._connection.command( + HeosCommand(c.COMMAND_TOGGLE_MUTE, {c.ATTR_PLAYER_ID: player_id}) + ) - @staticmethod - def get_play_mode(player_id: int) -> HeosCommand: - """Get the current play mode. + async def player_get_play_mode(self, player_id: int) -> PlayMode: + """Get the play mode of the player. References: 4.2.13 Get Play Mode""" - return HeosCommand(c.COMMAND_GET_PLAY_MODE, {c.ATTR_PLAYER_ID: player_id}) + result = await self._connection.command( + HeosCommand(c.COMMAND_GET_PLAY_MODE, {c.ATTR_PLAYER_ID: player_id}) + ) + return PlayMode._from_data(result) - @staticmethod - def set_play_mode(player_id: int, repeat: RepeatType, shuffle: bool) -> HeosCommand: - """Set the current play mode. + async def player_set_play_mode( + self, player_id: int, repeat: RepeatType, shuffle: bool + ) -> None: + """Set the play mode of the player. References: 4.2.14 Set Play Mode""" - return HeosCommand( - c.COMMAND_SET_PLAY_MODE, - { - c.ATTR_PLAYER_ID: player_id, - c.ATTR_REPEAT: repeat, - c.ATTR_SHUFFLE: c.VALUE_ON if shuffle else c.VALUE_OFF, - }, + await self._connection.command( + HeosCommand( + c.COMMAND_SET_PLAY_MODE, + { + c.ATTR_PLAYER_ID: player_id, + c.ATTR_REPEAT: repeat, + c.ATTR_SHUFFLE: c.VALUE_ON if shuffle else c.VALUE_OFF, + }, + ) ) - @staticmethod - def get_queue( - player_id: int, range_start: int | None = None, range_end: int | None = None - ) -> HeosCommand: + async def player_get_queue( + self, + player_id: int, + range_start: int | None = None, + range_end: int | None = None, + ) -> list[QueueItem]: """Get the queue for the current player. References: @@ -175,125 +328,157 @@ def get_queue( params: dict[str, Any] = {c.ATTR_PLAYER_ID: player_id} if isinstance(range_start, int) and isinstance(range_end, int): params[c.ATTR_RANGE] = f"{range_start},{range_end}" - return HeosCommand(c.COMMAND_GET_QUEUE, params) + result = await self._connection.command( + HeosCommand(c.COMMAND_GET_QUEUE, params) + ) + payload = cast(list[dict[str, str]], result.payload) + return [QueueItem.from_data(data) for data in payload] - @staticmethod - def play_queue(player_id: int, queue_id: int) -> HeosCommand: + async def player_play_queue(self, player_id: int, queue_id: int) -> None: """Play a queue item. References: 4.2.16 Play Queue Item""" - return HeosCommand( - c.COMMAND_PLAY_QUEUE, - {c.ATTR_PLAYER_ID: player_id, c.ATTR_QUEUE_ID: queue_id}, + await self._connection.command( + HeosCommand( + c.COMMAND_PLAY_QUEUE, + {c.ATTR_PLAYER_ID: player_id, c.ATTR_QUEUE_ID: queue_id}, + ) ) - @staticmethod - def remove_from_queue(player_id: int, queue_ids: list[int]) -> HeosCommand: + async def player_remove_from_queue( + self, player_id: int, queue_ids: list[int] + ) -> None: """Remove an item from the queue. References: 4.2.17 Remove Item(s) from Queue""" - return HeosCommand( - c.COMMAND_REMOVE_FROM_QUEUE, - { - c.ATTR_PLAYER_ID: player_id, - c.ATTR_QUEUE_ID: ",".join(map(str, queue_ids)), - }, + await self._connection.command( + HeosCommand( + c.COMMAND_REMOVE_FROM_QUEUE, + { + c.ATTR_PLAYER_ID: player_id, + c.ATTR_QUEUE_ID: ",".join(map(str, queue_ids)), + }, + ) ) - @staticmethod - def save_queue(player_id: int, name: str) -> HeosCommand: + async def player_save_queue(self, player_id: int, name: str) -> None: """Save the queue as a playlist. References: 4.2.18 Save Queue as Playlist""" if len(name) > 128: raise ValueError("'name' must be less than or equal to 128 characters") - return HeosCommand( - c.COMMAND_SAVE_QUEUE, - {c.ATTR_PLAYER_ID: player_id, c.ATTR_NAME: name}, + await self._connection.command( + HeosCommand( + c.COMMAND_SAVE_QUEUE, + {c.ATTR_PLAYER_ID: player_id, c.ATTR_NAME: name}, + ) ) - @staticmethod - def clear_queue(player_id: int) -> HeosCommand: + async def player_clear_queue(self, player_id: int) -> None: """Clear the queue. References: 4.2.19 Clear Queue""" - return HeosCommand(c.COMMAND_CLEAR_QUEUE, {c.ATTR_PLAYER_ID: player_id}) + await self._connection.command( + HeosCommand(c.COMMAND_CLEAR_QUEUE, {c.ATTR_PLAYER_ID: player_id}) + ) - @staticmethod - def move_queue_item( - player_id: int, source_queue_ids: list[int], destination_queue_id: int - ) -> HeosCommand: + async def player_move_queue_item( + self, player_id: int, source_queue_ids: list[int], destination_queue_id: int + ) -> None: """Move one or more items in the queue. References: 4.2.20 Move Queue""" - return HeosCommand( - c.COMMAND_MOVE_QUEUE_ITEM, - { - c.ATTR_PLAYER_ID: player_id, - c.ATTR_SOURCE_QUEUE_ID: ",".join(map(str, source_queue_ids)), - c.ATTR_DESTINATION_QUEUE_ID: destination_queue_id, - }, + await self._connection.command( + HeosCommand( + c.COMMAND_MOVE_QUEUE_ITEM, + { + c.ATTR_PLAYER_ID: player_id, + c.ATTR_SOURCE_QUEUE_ID: ",".join(map(str, source_queue_ids)), + c.ATTR_DESTINATION_QUEUE_ID: destination_queue_id, + }, + ) ) - @staticmethod - def play_next(player_id: int) -> HeosCommand: + async def player_play_next(self, player_id: int) -> None: """Play next. References: 4.2.21 Play Next""" - return HeosCommand(c.COMMAND_PLAY_NEXT, {c.ATTR_PLAYER_ID: player_id}) + await self._connection.command( + HeosCommand(c.COMMAND_PLAY_NEXT, {c.ATTR_PLAYER_ID: player_id}) + ) - @staticmethod - def play_previous(player_id: int) -> HeosCommand: + async def player_play_previous(self, player_id: int) -> None: """Play next. References: 4.2.22 Play Previous""" - return HeosCommand(c.COMMAND_PLAY_PREVIOUS, {c.ATTR_PLAYER_ID: player_id}) + await self._connection.command( + HeosCommand(c.COMMAND_PLAY_PREVIOUS, {c.ATTR_PLAYER_ID: player_id}) + ) - @staticmethod - def set_quick_select(player_id: int, quick_select_id: int) -> HeosCommand: + async def player_set_quick_select( + self, player_id: int, quick_select_id: int + ) -> None: """Play a quick select. References: 4.2.23 Set QuickSelect""" if quick_select_id < 1 or quick_select_id > 6: raise ValueError("'quick_select_id' must be in the range 1-6") - return HeosCommand( - c.COMMAND_SET_QUICK_SELECT, - {c.ATTR_PLAYER_ID: player_id, c.ATTR_ID: quick_select_id}, + await self._connection.command( + HeosCommand( + c.COMMAND_SET_QUICK_SELECT, + {c.ATTR_PLAYER_ID: player_id, c.ATTR_ID: quick_select_id}, + ) ) - @staticmethod - def play_quick_select(player_id: int, quick_select_id: int) -> HeosCommand: + async def player_play_quick_select( + self, player_id: int, quick_select_id: int + ) -> None: """Play a quick select. References: 4.2.24 Play QuickSelect""" if quick_select_id < 1 or quick_select_id > 6: raise ValueError("'quick_select_id' must be in the range 1-6") - return HeosCommand( - c.COMMAND_PLAY_QUICK_SELECT, - {c.ATTR_PLAYER_ID: player_id, c.ATTR_ID: quick_select_id}, + await self._connection.command( + HeosCommand( + c.COMMAND_PLAY_QUICK_SELECT, + {c.ATTR_PLAYER_ID: player_id, c.ATTR_ID: quick_select_id}, + ) ) - @staticmethod - def get_quick_selects(player_id: int) -> HeosCommand: + async def player_get_quick_selects(self, player_id: int) -> dict[int, str]: """Get quick selects. References: 4.2.25 Get QuickSelects""" - return HeosCommand(c.COMMAND_GET_QUICK_SELECTS, {c.ATTR_PLAYER_ID: player_id}) + result = await self._connection.command( + HeosCommand(c.COMMAND_GET_QUICK_SELECTS, {c.ATTR_PLAYER_ID: player_id}) + ) + return { + int(data[c.ATTR_ID]): data[c.ATTR_NAME] + for data in cast(list[dict], result.payload) + } - @staticmethod - def check_update(player_id: int) -> HeosCommand: + async def player_check_update(self, player_id: int) -> bool: """Check for a firmware update. + Args: + player_id: The identifier of the player to check for a firmware update. + Returns: + True if an update is available, otherwise False. + References: 4.2.26 Check for Firmware Update""" - return HeosCommand(c.COMMAND_CHECK_UPDATE, {c.ATTR_PLAYER_ID: player_id}) + result = await self._connection.command( + HeosCommand(c.COMMAND_CHECK_UPDATE, {c.ATTR_PLAYER_ID: player_id}) + ) + payload = cast(dict[str, Any], result.payload) + return bool(payload[c.ATTR_UPDATE] == c.VALUE_UPDATE_EXIST) diff --git a/pyheos/command/system.py b/pyheos/command/system.py index 20eb4b0..dbf4123 100644 --- a/pyheos/command/system.py +++ b/pyheos/command/system.py @@ -9,63 +9,130 @@ This command will not be implemented in the library. """ +from collections.abc import Sequence +from typing import Any, cast + from pyheos import command as c +from pyheos.command.connection import ConnectionMixin +from pyheos.credentials import Credentials from pyheos.message import HeosCommand +from pyheos.system import HeosHost, HeosSystem + + +class SystemCommands(ConnectionMixin): + """A mixin to provide access to the system commands.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Init a new instance of the BrowseMixin.""" + super(SystemCommands, self).__init__(*args, **kwargs) + + self._current_credentials = self._options.credentials + self._signed_in_username: str | None = None + + @property + def is_signed_in(self) -> bool: + """Return True if the HEOS accuont is signed in.""" + return bool(self._signed_in_username) + @property + def signed_in_username(self) -> str | None: + """Return the signed-in username.""" + return self._signed_in_username -class SystemCommands: - """Define functions for creating system commands.""" + @property + def current_credentials(self) -> Credentials | None: + """Return the current credential, if any set.""" + return self._current_credentials - @staticmethod - def register_for_change_events(enable: bool) -> HeosCommand: + @current_credentials.setter + def current_credentials(self, credentials: Credentials | None) -> None: + """Update the current credential.""" + self._current_credentials = credentials + + async def register_for_change_events(self, enable: bool) -> None: """Register for change events. References: 4.1.1 Register for Change Events""" - return HeosCommand( - c.COMMAND_REGISTER_FOR_CHANGE_EVENTS, - {c.ATTR_ENABLE: c.VALUE_ON if enable else c.VALUE_OFF}, + await self._connection.command( + HeosCommand( + c.COMMAND_REGISTER_FOR_CHANGE_EVENTS, + {c.ATTR_ENABLE: c.VALUE_ON if enable else c.VALUE_OFF}, + ) ) - @staticmethod - def check_account() -> HeosCommand: - """Create a check account c. + async def check_account(self) -> str | None: + """Return the logged in username. References: 4.1.2 HEOS Account Check""" - return HeosCommand(c.COMMAND_ACCOUNT_CHECK) - - @staticmethod - def sign_in(username: str, password: str) -> HeosCommand: - """Create a sign in c. + result = await self._connection.command(HeosCommand(c.COMMAND_ACCOUNT_CHECK)) + if c.ATTR_SIGNED_IN in result.message: + self._signed_in_username = result.get_message_value(c.ATTR_USER_NAME) + else: + self._signed_in_username = None + return self._signed_in_username + + async def sign_in( + self, username: str, password: str, *, update_credential: bool = True + ) -> str: + """Sign in to the HEOS account using the provided credential and return the user name. + + Args: + username: The username of the HEOS account. + password: The password of the HEOS account. + update_credential: Set to True to update the stored credential if login is successful, False to keep the current credential. The default is True. If the credential is updated, it will be used to signed in automatically upon reconnection. + + Returns: + The username of the signed in account. References: 4.1.3 HEOS Account Sign In""" - return HeosCommand( - c.COMMAND_SIGN_IN, - {c.ATTR_USER_NAME: username, c.ATTR_PASSWORD: password}, + result = await self._connection.command( + HeosCommand( + c.COMMAND_SIGN_IN, + {c.ATTR_USER_NAME: username, c.ATTR_PASSWORD: password}, + ) ) + self._signed_in_username = result.get_message_value(c.ATTR_USER_NAME) + if update_credential: + self.current_credentials = Credentials(username, password) + return self._signed_in_username + + async def sign_out(self, *, update_credential: bool = True) -> None: + """Sign out of the HEOS account. - @staticmethod - def sign_out() -> HeosCommand: - """Create a sign out c. + Args: + update_credential: Set to True to clear the stored credential, False to keep it. The default is True. If the credential is cleared, the account will not be signed in automatically upon reconnection. References: 4.1.4 HEOS Account Sign Out""" - return HeosCommand(c.COMMAND_SIGN_OUT) + await self._connection.command(HeosCommand(c.COMMAND_SIGN_OUT)) + self._signed_in_username = None + if update_credential: + self.current_credentials = None - @staticmethod - def heart_beat() -> HeosCommand: - """Create a heart beat c. + async def heart_beat(self) -> None: + """Send a heart beat message to the HEOS device. References: 4.1.5 HEOS System Heart Beat""" - return HeosCommand(c.COMMAND_HEART_BEAT) + await self._connection.command(HeosCommand(c.COMMAND_HEART_BEAT)) - @staticmethod - def reboot() -> HeosCommand: - """Create a reboot c. + async def reboot(self) -> None: + """Reboot the HEOS device. References: 4.1.6 HEOS Speaker Reboot""" - return HeosCommand(c.COMMAND_REBOOT) + await self._connection.command(HeosCommand(c.COMMAND_REBOOT)) + + async def get_system_info(self) -> HeosSystem: + """Get information about the HEOS system. + + References: + 4.2.1 Get Players""" + response = await self._connection.command(HeosCommand(c.COMMAND_GET_PLAYERS)) + payload = cast(Sequence[dict], response.payload) + hosts = list([HeosHost._from_data(item) for item in payload]) + host = next(host for host in hosts if host.ip_address == self._options.host) + return HeosSystem(self._signed_in_username, host, hosts) diff --git a/pyheos/connection.py b/pyheos/connection.py index 5554375..859ead8 100644 --- a/pyheos/connection.py +++ b/pyheos/connection.py @@ -6,8 +6,7 @@ from datetime import datetime, timedelta from typing import TYPE_CHECKING, Final -from pyheos.command import COMMAND_REBOOT -from pyheos.command.system import SystemCommands +from pyheos.command import COMMAND_HEART_BEAT, COMMAND_REBOOT from pyheos.message import HeosCommand, HeosMessage from pyheos.types import ConnectionState @@ -309,7 +308,7 @@ async def _heart_beat_handler(self) -> None: last_acitvity_delta = datetime.now() - self._last_activity if last_acitvity_delta >= self._heart_beat_interval_delta: try: - await self.command(SystemCommands.heart_beat()) + await self.command(HeosCommand(COMMAND_HEART_BEAT)) except (CommandError, asyncio.TimeoutError): # Exit the task, as the connection will be reset/closed. return diff --git a/pyheos/heos.py b/pyheos/heos.py index 1500175..6475693 100644 --- a/pyheos/heos.py +++ b/pyheos/heos.py @@ -1,17 +1,13 @@ """Define the heos manager module.""" -import asyncio import logging -from collections.abc import Sequence -from dataclasses import dataclass, field -from typing import Any, Final, cast +from typing import Any, Final from pyheos.command import COMMAND_SIGN_IN from pyheos.command.browse import BrowseCommands from pyheos.command.group import GroupCommands from pyheos.command.player import PlayerCommands from pyheos.command.system import SystemCommands -from pyheos.credentials import Credentials from pyheos.dispatch import ( CallbackType, ControllerEventCallbackType, @@ -20,29 +16,16 @@ callback_wrapper, ) from pyheos.error import CommandAuthenticationError, CommandFailedError -from pyheos.media import ( - BrowseResult, - MediaItem, - MediaMusicSource, - QueueItem, - RetreiveMetadataResult, -) from pyheos.message import HeosMessage -from pyheos.search import MultiSearchResult, SearchCriteria, SearchResult -from pyheos.system import HeosHost, HeosSystem +from pyheos.options import HeosOptions +from pyheos.player import PlayerUpdateResult +from pyheos.system import HeosSystem from . import command as c from . import const -from .connection import AutoReconnectingConnection from .dispatch import Dispatcher -from .group import HeosGroup -from .player import HeosNowPlayingMedia, HeosPlayer, PlayMode from .types import ( - AddCriteriaType, ConnectionState, - MediaType, - PlayState, - RepeatType, SignalHeosEvent, SignalType, ) @@ -50,1202 +33,7 @@ _LOGGER: Final = logging.getLogger(__name__) -@dataclass -class PlayerUpdateResult: - """Define the result of refreshing players. - - Args: - added_player_ids: The list of player identifiers that have been added. - removed_player_ids: The list of player identifiers that have been removed. - updated_player_ids: A dictionary that maps the previous player_id to the updated player_id - """ - - added_player_ids: list[int] = field(default_factory=list) - removed_player_ids: list[int] = field(default_factory=list) - updated_player_ids: dict[int, int] = field(default_factory=dict) - - -@dataclass(frozen=True) -class HeosOptions: - """ - The HeosOptions encapsulates options for connecting to a Heos System. - - Args: - host: A host name or IP address of a HEOS-capable device. - timeout: The timeout in seconds for opening a connectoin and issuing commands to the device. - events: Set to True to enable event updates, False to disable. The default is True. - heart_beat: Set to True to enable heart beat messages, False to disable. Used in conjunction with heart_beat_delay. The default is True. - heart_beat_interval: The interval in seconds between heart beat messages. Used in conjunction with heart_beat. - all_progress_events: Set to True to receive media progress events, False to only receive media changed events. The default is True. - dispatcher: The dispatcher instance to use for event callbacks. If not provided, an internally created instance will be used. - auto_reconnect: Set to True to automatically reconnect if the connection is lost. The default is False. Used in conjunction with auto_reconnect_delay. - auto_reconnect_delay: The delay in seconds before attempting to reconnect. The default is 10 seconds. Used in conjunction with auto_reconnect. - credentials: credentials to use to automatically sign-in to the HEOS account upon successful connection. If not provided, the account will not be signed in. - """ - - host: str - timeout: float = field(default=const.DEFAULT_TIMEOUT, kw_only=True) - events: bool = field(default=True, kw_only=True) - all_progress_events: bool = field(default=True, kw_only=True) - dispatcher: Dispatcher | None = field(default=None, kw_only=True) - auto_reconnect: bool = field(default=False, kw_only=True) - auto_reconnect_delay: float = field( - default=const.DEFAULT_RECONNECT_DELAY, kw_only=True - ) - auto_reconnect_max_attempts: int = field( - default=const.DEFAULT_RECONNECT_ATTEMPTS, kw_only=True - ) - heart_beat: bool = field(default=True, kw_only=True) - heart_beat_interval: float = field(default=const.DEFAULT_HEART_BEAT, kw_only=True) - credentials: Credentials | None = field(default=None, kw_only=True) - - -class ConnectionMixin: - "A mixin to provide access to the connection." - - def __init__(self, options: HeosOptions) -> None: - """Init a new instance of the ConnectionMixin.""" - self._options = options - self._connection = AutoReconnectingConnection( - options.host, - timeout=options.timeout, - reconnect=options.auto_reconnect, - reconnect_delay=options.auto_reconnect_delay, - reconnect_max_attempts=options.auto_reconnect_max_attempts, - heart_beat=options.heart_beat, - heart_beat_interval=options.heart_beat_interval, - ) - - @property - def connection_state(self) -> ConnectionState: - """Get the state of the connection.""" - return self._connection.state - - -class SystemMixin(ConnectionMixin): - """A mixin to provide access to the system commands.""" - - def __init__(self, *args: Any, **kwargs: Any) -> None: - """Init a new instance of the BrowseMixin.""" - super(SystemMixin, self).__init__(*args, **kwargs) - - self._current_credentials = self._options.credentials - self._signed_in_username: str | None = None - - @property - def is_signed_in(self) -> bool: - """Return True if the HEOS accuont is signed in.""" - return bool(self._signed_in_username) - - @property - def signed_in_username(self) -> str | None: - """Return the signed-in username.""" - return self._signed_in_username - - @property - def current_credentials(self) -> Credentials | None: - """Return the current credential, if any set.""" - return self._current_credentials - - @current_credentials.setter - def current_credentials(self, credentials: Credentials | None) -> None: - """Update the current credential.""" - self._current_credentials = credentials - - async def register_for_change_events(self, enable: bool) -> None: - """Register for change events. - - References: - 4.1.1 Register for Change Events""" - await self._connection.command( - SystemCommands.register_for_change_events(enable) - ) - - async def check_account(self) -> str | None: - """Return the logged in username. - - References: - 4.1.2 HEOS Account Check""" - result = await self._connection.command(SystemCommands.check_account()) - if c.ATTR_SIGNED_IN in result.message: - self._signed_in_username = result.get_message_value(c.ATTR_USER_NAME) - else: - self._signed_in_username = None - return self._signed_in_username - - async def sign_in( - self, username: str, password: str, *, update_credential: bool = True - ) -> str: - """Sign in to the HEOS account using the provided credential and return the user name. - - Args: - username: The username of the HEOS account. - password: The password of the HEOS account. - update_credential: Set to True to update the stored credential if login is successful, False to keep the current credential. The default is True. If the credential is updated, it will be used to signed in automatically upon reconnection. - - Returns: - The username of the signed in account. - - References: - 4.1.3 HEOS Account Sign In""" - result = await self._connection.command( - SystemCommands.sign_in(username, password) - ) - self._signed_in_username = result.get_message_value(c.ATTR_USER_NAME) - if update_credential: - self.current_credentials = Credentials(username, password) - return self._signed_in_username - - async def sign_out(self, *, update_credential: bool = True) -> None: - """Sign out of the HEOS account. - - Args: - update_credential: Set to True to clear the stored credential, False to keep it. The default is True. If the credential is cleared, the account will not be signed in automatically upon reconnection. - - References: - 4.1.4 HEOS Account Sign Out""" - await self._connection.command(SystemCommands.sign_out()) - self._signed_in_username = None - if update_credential: - self.current_credentials = None - - async def heart_beat(self) -> None: - """Send a heart beat message to the HEOS device. - - References: - 4.1.5 HEOS System Heart Beat""" - await self._connection.command(SystemCommands.heart_beat()) - - async def reboot(self) -> None: - """Reboot the HEOS device. - - References: - 4.1.6 HEOS Speaker Reboot""" - await self._connection.command(SystemCommands.reboot()) - - async def get_system_info(self) -> HeosSystem: - """Get information about the HEOS system. - - References: - 4.2.1 Get Players""" - response = await self._connection.command(PlayerCommands.get_players()) - payload = cast(Sequence[dict], response.payload) - hosts = list([HeosHost._from_data(item) for item in payload]) - host = next(host for host in hosts if host.ip_address == self._options.host) - return HeosSystem(self._signed_in_username, host, hosts) - - -class BrowseMixin(ConnectionMixin): - """A mixin to provide access to the browse commands.""" - - def __init__(self, *args: Any, **kwargs: Any) -> None: - """Init a new instance of the BrowseMixin.""" - super(BrowseMixin, self).__init__(*args, **kwargs) - - self._music_sources: dict[int, MediaMusicSource] = {} - self._music_sources_loaded = False - - @property - def music_sources(self) -> dict[int, MediaMusicSource]: - """Get available music sources.""" - return self._music_sources - - async def get_music_sources( - self, refresh: bool = False - ) -> dict[int, MediaMusicSource]: - """ - Get available music sources. - - References: - 4.4.1 Get Music Sources - """ - if not self._music_sources_loaded or refresh: - message = await self._connection.command( - BrowseCommands.get_music_sources(refresh) - ) - self._music_sources.clear() - for data in cast(Sequence[dict], message.payload): - source = MediaMusicSource.from_data(data, cast("Heos", self)) - self._music_sources[source.source_id] = source - self._music_sources_loaded = True - return self._music_sources - - async def get_music_source_info( - self, - source_id: int | None = None, - music_source: MediaMusicSource | None = None, - *, - refresh: bool = False, - ) -> MediaMusicSource: - """ - Get information about a specific music source. - - References: - 4.4.2 Get Source Info - """ - if source_id is None and music_source is None: - raise ValueError("Either source_id or music_source must be provided") - if source_id is not None and music_source is not None: - raise ValueError("Only one of source_id or music_source should be provided") - - # if only source_id provided, try getting from loaded - if music_source is None: - assert source_id is not None - music_source = self._music_sources.get(source_id) - else: - source_id = music_source.source_id - - if music_source is None or refresh: - # Get the latest information - result = await self._connection.command( - BrowseCommands.get_music_source_info(source_id) - ) - payload = cast(dict[str, Any], result.payload) - if music_source is None: - music_source = MediaMusicSource.from_data(payload, cast("Heos", self)) - else: - music_source._update_from_data(payload) - return music_source - - async def browse( - self, - source_id: int, - container_id: str | None = None, - range_start: int | None = None, - range_end: int | None = None, - ) -> BrowseResult: - """Browse the contents of the specified source or container. - - References: - 4.4.3 Browse Source - 4.4.4 Browse Source Containers - 4.4.13 Get HEOS Playlists - 4.4.16 Get HEOS History - - Args: - source_id: The identifier of the source to browse. - container_id: The identifier of the container to browse. If not provided, the root of the source will be expanded. - range_start: The index of the first item to return. Both range_start and range_end must be provided to return a range of items. - range_end: The index of the last item to return. Both range_start and range_end must be provided to return a range of items. - Returns: - A BrowseResult instance containing the items in the source or container. - """ - message = await self._connection.command( - BrowseCommands.browse(source_id, container_id, range_start, range_end) - ) - return BrowseResult._from_message(message, cast("Heos", self)) - - async def browse_media( - self, - media: MediaItem | MediaMusicSource, - range_start: int | None = None, - range_end: int | None = None, - ) -> BrowseResult: - """Browse the contents of the specified media item. - - References: - 4.4.3 Browse Source - 4.4.4 Browse Source Containers - 4.4.13 Get HEOS Playlists - 4.4.16 Get HEOS History - - Args: - media: The media item to browse, must be of type MediaItem or MediaMusicSource. - range_start: The index of the first item to return. Both range_start and range_end must be provided to return a range of items. - range_end: The index of the last item to return. Both range_start and range_end must be provided to return a range of items. - Returns: - A BrowseResult instance containing the items in the media item. - """ - if isinstance(media, MediaMusicSource): - if not media.available: - raise ValueError("Source is not available to browse") - return await self.browse(media.source_id) - else: - if not media.browsable: - raise ValueError("Only media sources and containers can be browsed") - return await self.browse( - media.source_id, media.container_id, range_start, range_end - ) - - async def get_search_criteria(self, source_id: int) -> list[SearchCriteria]: - """ - Create a HEOS command to get the search criteria. - - References: - 4.4.5 Get Search Criteria - """ - result = await self._connection.command( - BrowseCommands.get_search_criteria(source_id) - ) - payload = cast(list[dict[str, str]], result.payload) - return [SearchCriteria._from_data(data) for data in payload] - - async def search( - self, - source_id: int, - search: str, - criteria_id: int, - range_start: int | None = None, - range_end: int | None = None, - ) -> SearchResult: - """ - Create a HEOS command to search for media. - - References: - 4.4.6 Search""" - - result = await self._connection.command( - BrowseCommands.search( - source_id, search, criteria_id, range_start, range_end - ) - ) - return SearchResult._from_message(result, cast("Heos", self)) - - async def play_input_source( - self, player_id: int, input: str, source_player_id: int | None = None - ) -> None: - """ - Play the specified input source on the specified player. - - References: - 4.4.9 Play Input Source - - Args: - player_id: The identifier of the player to play the input source. - input: The input source to play. - source_player_id: The identifier of the player that has the input source, if different than the player_id. - """ - await self._connection.command( - BrowseCommands.play_input_source(player_id, input, source_player_id) - ) - - async def play_station( - self, player_id: int, source_id: int, container_id: str | None, media_id: str - ) -> None: - """ - Play the specified station on the specified player. - - References: - 4.4.7 Play Station - - Args: - player_id: The identifier of the player to play the station. - source_id: The identifier of the source containing the station. - container_id: The identifier of the container containing the station. - media_id: The identifier of the station to play. - """ - await self._connection.command( - BrowseCommands.play_station(player_id, source_id, container_id, media_id) - ) - - async def play_preset_station(self, player_id: int, index: int) -> None: - """ - Play the preset station on the specified player (favorite) - - References: - 4.4.8 Play Preset Station - - Args: - player_id: The identifier of the player to play the preset station. - index: The index of the preset station to play. - """ - await self._connection.command( - BrowseCommands.play_preset_station(player_id, index) - ) - - async def play_url(self, player_id: int, url: str) -> None: - """ - Play the specified URL on the specified player. - - References: - 4.4.10 Play URL - - Args: - player_id: The identifier of the player to play the URL. - url: The URL to play. - """ - await self._connection.command(BrowseCommands.play_url(player_id, url)) - - async def add_to_queue( - self, - player_id: int, - source_id: int, - container_id: str, - media_id: str | None = None, - add_criteria: AddCriteriaType = AddCriteriaType.PLAY_NOW, - ) -> None: - """ - Add the specified media item to the queue of the specified player. - - References: - 4.4.11 Add Container to Queue with Options - 4.4.12 Add Track to Queue with Options - - Args: - player_id: The identifier of the player to add the media item. - source_id: The identifier of the source containing the media item. - container_id: The identifier of the container containing the media item. - media_id: The identifier of the media item to add. Required for MediaType.Song. - add_criteria: Determines how tracks are added to the queue. The default is AddCriteriaType.PLAY_NOW. - """ - await self._connection.command( - BrowseCommands.add_to_queue( - player_id=player_id, - source_id=source_id, - container_id=container_id, - media_id=media_id, - add_criteria=add_criteria, - ) - ) - - async def add_search_to_queue( - self, - player_id: int, - source_id: int, - search: str, - criteria_container_id: str = const.SEARCHED_TRACKS, - add_criteria: AddCriteriaType = AddCriteriaType.PLAY_NOW, - ) -> None: - """Add searched tracks to the queue of the specified player. - - References: - 4.4.11 Add Container to Queue with Options - - Args: - player_id: The identifier of the player to add the search results. - source_id: The identifier of the source to search. - search: The search string. - criteria_container_id: the criteria container id prefix. - add_criteria: Determines how tracks are added to the queue. The default is AddCriteriaType.PLAY_NOW. - """ - await self.add_to_queue( - player_id=player_id, - source_id=source_id, - container_id=f"{criteria_container_id}{search}", - add_criteria=add_criteria, - ) - - async def rename_playlist( - self, source_id: int, container_id: str, new_name: str - ) -> None: - """ - Rename a HEOS playlist. - - References: - 4.4.14 Rename HEOS Playlist - """ - await self._connection.command( - BrowseCommands.rename_playlist(source_id, container_id, new_name) - ) - - async def delete_playlist(self, source_id: int, container_id: str) -> None: - """ - Create a HEOS command to delete a playlist. - - References: - 4.4.15 Delete HEOS Playlist""" - await self._connection.command( - BrowseCommands.delete_playlist(source_id, container_id) - ) - - async def retrieve_metadata( - self, source_it: int, container_id: str - ) -> RetreiveMetadataResult: - """ - Create a HEOS command to retrieve metadata. Only supported by Rhapsody/Napster music sources. - - References: - 4.4.17 Retrieve Metadata - """ - result = await self._connection.command( - BrowseCommands.retrieve_metadata(source_it, container_id) - ) - return RetreiveMetadataResult._from_message(result) - - async def set_service_option( - this, - option_id: int, - source_id: int | None = None, - container_id: str | None = None, - media_id: str | None = None, - player_id: int | None = None, - name: str | None = None, - criteria_id: int | None = None, - range_start: int | None = None, - range_end: int | None = None, - ) -> None: - """ - Create a HEOS command to set a service option. - - References: - 4.4.19 Set Service Option - """ - await this._connection.command( - BrowseCommands.set_service_option( - option_id, - source_id, - container_id, - media_id, - player_id, - name, - criteria_id, - range_start, - range_end, - ) - ) - - async def play_media( - self, - player_id: int, - media: MediaItem, - add_criteria: AddCriteriaType = AddCriteriaType.PLAY_NOW, - ) -> None: - """ - Play the specified media item on the specified player. - - Args: - player_id: The identifier of the player to play the media item. - media: The media item to play. - add_criteria: Determines how containers or tracks are added to the queue. The default is AddCriteriaType.PLAY_NOW. - """ - if not media.playable: - raise ValueError(f"Media '{media}' is not playable") - - if media.media_id in const.VALID_INPUTS: - await self.play_input_source(player_id, media.media_id, media.source_id) - elif media.type == MediaType.STATION: - if media.media_id is None: - raise ValueError(f"'Media '{media}' cannot have a None media_id") - await self.play_station( - player_id=player_id, - source_id=media.source_id, - container_id=media.container_id, - media_id=media.media_id, - ) - else: - # Handles both songs and containers - if media.container_id is None: - raise ValueError(f"Media '{media}' cannot have a None container_id") - await self.add_to_queue( - player_id=player_id, - source_id=media.source_id, - container_id=media.container_id, - media_id=media.media_id, - add_criteria=add_criteria, - ) - - async def get_input_sources(self) -> Sequence[MediaItem]: - """ - Get available input sources. - - This will browse all aux input sources and return a list of all available input sources. - - Returns: - A sequence of MediaItem instances representing the available input sources across all aux input sources. - """ - result = await self.browse(const.MUSIC_SOURCE_AUX_INPUT) - input_sources: list[MediaItem] = [] - for item in result.items: - source_browse_result = await item.browse() - input_sources.extend(source_browse_result.items) - - return input_sources - - async def get_favorites(self) -> dict[int, MediaItem]: - """ - Get available favorites. - - This will browse the favorites music source and return a dictionary of all available favorites. - - Returns: - A dictionary with keys representing the index (1-based) of the favorite and the value being the MediaItem instance. - """ - result = await self.browse(const.MUSIC_SOURCE_FAVORITES) - return {index + 1: source for index, source in enumerate(result.items)} - - async def get_playlists(self) -> Sequence[MediaItem]: - """ - Get available playlists. - - This will browse the playlists music source and return a list of all available playlists. - - Returns: - A sequence of MediaItem instances representing the available playlists. - """ - result = await self.browse(const.MUSIC_SOURCE_PLAYLISTS) - return result.items - - async def multi_search( - self, - search: str, - source_ids: list[int] | None = None, - criteria_ids: list[int] | None = None, - ) -> MultiSearchResult: - """ - Create a HEOS command to perform a multi-search. - - References: - 4.4.20 Multi Search - """ - result = await self._connection.command( - BrowseCommands.multi_search(search, source_ids, criteria_ids) - ) - return MultiSearchResult._from_message(result, cast("Heos", self)) - - -class PlayerMixin(ConnectionMixin): - """A mixin to provide access to the player commands.""" - - def __init__(self, *args: Any, **kwargs: Any) -> None: - """Init a new instance of the BrowseMixin.""" - super(PlayerMixin, self).__init__(*args, **kwargs) - - self._players: dict[int, HeosPlayer] = {} - self._players_loaded = False - - @property - def players(self) -> dict[int, HeosPlayer]: - """Get the loaded players.""" - return self._players - - async def get_players(self, *, refresh: bool = False) -> dict[int, HeosPlayer]: - """Get available players. - - References: - 4.2.1 Get Players""" - # get players and pull initial state - if not self._players_loaded or refresh: - await self.load_players() - return self._players - - async def get_player_info( - self, - player_id: int | None = None, - player: HeosPlayer | None = None, - *, - refresh: bool = False, - ) -> HeosPlayer: - """Get information about a player. - - Only one of player_id or player should be provided. - - Args: - palyer_id: The identifier of the group to get information about. Only one of player_id or player should be provided. - player: The HeosPlayer instance to update with the latest information. Only one of player_id or player should be provided. - refresh: Set to True to force a refresh of the group information. - Returns: - A HeosPlayer instance containing the player information. - - References: - 4.2.2 Get Player Info""" - if player_id is None and player is None: - raise ValueError("Either player_id or player must be provided") - if player_id is not None and player is not None: - raise ValueError("Only one of player_id or player should be provided") - - # if only palyer_id provided, try getting from loaded - if player is None: - assert player_id is not None - player = self._players.get(player_id) - else: - player_id = player.player_id - - if player is None or refresh: - # Get the latest information - result = await self._connection.command( - PlayerCommands.get_player_info(player_id) - ) - - payload = cast(dict[str, Any], result.payload) - if player is None: - player = HeosPlayer._from_data(payload, cast("Heos", self)) - else: - player._update_from_data(payload) - await player.refresh(refresh_base_info=False) - return player - - async def load_players(self) -> "PlayerUpdateResult": - """Refresh the players.""" - result = PlayerUpdateResult() - - players: dict[int, HeosPlayer] = {} - response = await self._connection.command(PlayerCommands.get_players()) - payload = cast(Sequence[dict], response.payload) - existing = list(self._players.values()) - for player_data in payload: - player_id = int(player_data[c.ATTR_PLAYER_ID]) - name = player_data[c.ATTR_NAME] - version = player_data[c.ATTR_VERSION] - serial = player_data.get(c.ATTR_SERIAL) - # Try matching by serial (if available), then try matching by player_id - # and fallback to matching name when firmware version is different - player = next( - ( - player - for player in existing - if (player.serial == serial and serial is not None) - or player.player_id == player_id - or (player.name == name and player.version != version) - ), - None, - ) - if player: - # Found existing, update - if player.player_id != player_id: - result.updated_player_ids[player.player_id] = player_id - player._update_from_data(player_data) - player.available = True - players[player_id] = player - existing.remove(player) - else: - # New player - player = HeosPlayer._from_data(player_data, cast("Heos", self)) - result.added_player_ids.append(player_id) - players[player_id] = player - # For any item remaining in existing, mark unavailalbe, add to updated - for player in existing: - result.removed_player_ids.append(player.player_id) - player.available = False - players[player.player_id] = player - - # Pull data for available players - await asyncio.gather( - *[ - player.refresh(refresh_base_info=False) - for player in players.values() - if player.available - ] - ) - self._players = players - self._players_loaded = True - return result - - async def player_get_play_state(self, player_id: int) -> PlayState: - """Get the state of the player. - - References: - 4.2.3 Get Play State""" - response = await self._connection.command( - PlayerCommands.get_play_state(player_id) - ) - return PlayState(response.get_message_value(c.ATTR_STATE)) - - async def player_set_play_state(self, player_id: int, state: PlayState) -> None: - """Set the state of the player. - - References: - 4.2.4 Set Play State""" - await self._connection.command(PlayerCommands.set_play_state(player_id, state)) - - async def get_now_playing_media( - self, player_id: int, update: HeosNowPlayingMedia | None = None - ) -> HeosNowPlayingMedia: - """Get the now playing media information. - - Args: - player_id: The identifier of the player to get the now playing media. - update: The current now playing media information to update. If not provided, a new instance will be created. - - Returns: - A HeosNowPlayingMedia instance containing the now playing media information. - - References: - 4.2.5 Get Now Playing Media""" - result = await self._connection.command( - PlayerCommands.get_now_playing_media(player_id) - ) - instance = update or HeosNowPlayingMedia() - instance._update_from_message(result) - return instance - - async def player_get_volume(self, player_id: int) -> int: - """Get the volume level of the player. - - References: - 4.2.6 Get Volume""" - result = await self._connection.command(PlayerCommands.get_volume(player_id)) - return result.get_message_value_int(c.ATTR_LEVEL) - - async def player_set_volume(self, player_id: int, level: int) -> None: - """Set the volume of the player. - - References: - 4.2.7 Set Volume""" - await self._connection.command(PlayerCommands.set_volume(player_id, level)) - - async def player_volume_up( - self, player_id: int, step: int = const.DEFAULT_STEP - ) -> None: - """Increase the volume level. - - References: - 4.2.8 Volume Up""" - await self._connection.command(PlayerCommands.volume_up(player_id, step)) - - async def player_volume_down( - self, player_id: int, step: int = const.DEFAULT_STEP - ) -> None: - """Increase the volume level. - - References: - 4.2.9 Volume Down""" - await self._connection.command(PlayerCommands.volume_down(player_id, step)) - - async def player_get_mute(self, player_id: int) -> bool: - """Get the mute state of the player. - - References: - 4.2.10 Get Mute""" - result = await self._connection.command(PlayerCommands.get_mute(player_id)) - return result.get_message_value(c.ATTR_STATE) == c.VALUE_ON - - async def player_set_mute(self, player_id: int, state: bool) -> None: - """Set the mute state of the player. - - References: - 4.2.11 Set Mute""" - await self._connection.command(PlayerCommands.set_mute(player_id, state)) - - async def player_toggle_mute(self, player_id: int) -> None: - """Toggle the mute state. - - References: - 4.2.12 Toggle Mute""" - await self._connection.command(PlayerCommands.toggle_mute(player_id)) - - async def player_get_play_mode(self, player_id: int) -> PlayMode: - """Get the play mode of the player. - - References: - 4.2.13 Get Play Mode""" - result = await self._connection.command(PlayerCommands.get_play_mode(player_id)) - return PlayMode._from_data(result) - - async def player_set_play_mode( - self, player_id: int, repeat: RepeatType, shuffle: bool - ) -> None: - """Set the play mode of the player. - - References: - 4.2.14 Set Play Mode""" - await self._connection.command( - PlayerCommands.set_play_mode(player_id, repeat, shuffle) - ) - - async def player_get_queue( - self, - player_id: int, - range_start: int | None = None, - range_end: int | None = None, - ) -> list[QueueItem]: - """Get the queue for the current player. - - References: - 4.2.15 Get Queue - """ - result = await self._connection.command( - PlayerCommands.get_queue(player_id, range_start, range_end) - ) - payload = cast(list[dict[str, str]], result.payload) - return [QueueItem.from_data(data) for data in payload] - - async def player_play_queue(self, player_id: int, queue_id: int) -> None: - """Play a queue item. - - References: - 4.2.16 Play Queue Item""" - await self._connection.command(PlayerCommands.play_queue(player_id, queue_id)) - - async def player_remove_from_queue( - self, player_id: int, queue_ids: list[int] - ) -> None: - """Remove an item from the queue. - - References: - 4.2.17 Remove Item(s) from Queue""" - await self._connection.command( - PlayerCommands.remove_from_queue(player_id, queue_ids) - ) - - async def player_save_queue(self, player_id: int, name: str) -> None: - """Save the queue as a playlist. - - References: - 4.2.18 Save Queue as Playlist""" - await self._connection.command(PlayerCommands.save_queue(player_id, name)) - - async def player_clear_queue(self, player_id: int) -> None: - """Clear the queue. - - References: - 4.2.19 Clear Queue""" - await self._connection.command(PlayerCommands.clear_queue(player_id)) - - async def player_move_queue_item( - self, player_id: int, source_queue_ids: list[int], destination_queue_id: int - ) -> None: - """Move one or more items in the queue. - - References: - 4.2.20 Move Queue""" - await self._connection.command( - PlayerCommands.move_queue_item( - player_id, source_queue_ids, destination_queue_id - ) - ) - - async def player_play_next(self, player_id: int) -> None: - """Play next. - - References: - 4.2.21 Play Next""" - await self._connection.command(PlayerCommands.play_next(player_id)) - - async def player_play_previous(self, player_id: int) -> None: - """Play next. - - References: - 4.2.22 Play Previous""" - await self._connection.command(PlayerCommands.play_previous(player_id)) - - async def player_set_quick_select( - self, player_id: int, quick_select_id: int - ) -> None: - """Play a quick select. - - References: - 4.2.23 Set QuickSelect""" - await self._connection.command( - PlayerCommands.set_quick_select(player_id, quick_select_id) - ) - - async def player_play_quick_select( - self, player_id: int, quick_select_id: int - ) -> None: - """Play a quick select. - - References: - 4.2.24 Play QuickSelect""" - await self._connection.command( - PlayerCommands.play_quick_select(player_id, quick_select_id) - ) - - async def player_get_quick_selects(self, player_id: int) -> dict[int, str]: - """Get quick selects. - - References: - 4.2.25 Get QuickSelects""" - result = await self._connection.command( - PlayerCommands.get_quick_selects(player_id) - ) - return { - int(data[c.ATTR_ID]): data[c.ATTR_NAME] - for data in cast(list[dict], result.payload) - } - - async def player_check_update(self, player_id: int) -> bool: - """Check for a firmware update. - - Args: - player_id: The identifier of the player to check for a firmware update. - Returns: - True if an update is available, otherwise False. - - References: - 4.2.26 Check for Firmware Update""" - result = await self._connection.command(PlayerCommands.check_update(player_id)) - payload = cast(dict[str, Any], result.payload) - return bool(payload[c.ATTR_UPDATE] == c.VALUE_UPDATE_EXIST) - - -class GroupMixin(ConnectionMixin): - """A mixin to provide access to the group commands.""" - - def __init__(self, *args: Any, **kwargs: Any) -> None: - """Init a new instance of the BrowseMixin.""" - super(GroupMixin, self).__init__(*args, **kwargs) - self._groups: dict[int, HeosGroup] = {} - self._groups_loaded = False - - @property - def groups(self) -> dict[int, HeosGroup]: - """Get the loaded groups.""" - return self._groups - - async def get_groups(self, *, refresh: bool = False) -> dict[int, HeosGroup]: - """Get available groups. - - References: - 4.3.1 Get Groups""" - if not self._groups_loaded or refresh: - groups = {} - result = await self._connection.command(GroupCommands.get_groups()) - payload = cast(Sequence[dict], result.payload) - for data in payload: - group = HeosGroup._from_data(data, cast("Heos", self)) - groups[group.group_id] = group - self._groups = groups - # Update all statuses - await asyncio.gather( - *[ - group.refresh(refresh_base_info=False) - for group in self._groups.values() - ] - ) - self._groups_loaded = True - return self._groups - - async def get_group_info( - self, - group_id: int | None = None, - group: HeosGroup | None = None, - *, - refresh: bool = False, - ) -> HeosGroup: - """Get information about a group. - - Only one of group_id or group should be provided. - - Args: - group_id: The identifier of the group to get information about. Only one of group_id or group should be provided. - group: The HeosGroup instance to update with the latest information. Only one of group_id or group should be provided. - refresh: Set to True to force a refresh of the group information. - - References: - 4.3.2 Get Group Info""" - if group_id is None and group is None: - raise ValueError("Either group_id or group must be provided") - if group_id is not None and group is not None: - raise ValueError("Only one of group_id or group should be provided") - - # if only group_id provided, try getting from loaded - if group is None: - assert group_id is not None - group = self._groups.get(group_id) - else: - group_id = group.group_id - - if group is None or refresh: - # Get the latest information - result = await self._connection.command( - GroupCommands.get_group_info(group_id) - ) - payload = cast(dict[str, Any], result.payload) - if group is None: - group = HeosGroup._from_data(payload, cast("Heos", self)) - else: - group._update_from_data(payload) - await group.refresh(refresh_base_info=False) - return group - - async def set_group(self, player_ids: Sequence[int]) -> None: - """Create, modify, or ungroup players. - - Args: - player_ids: The list of player identifiers to group or ungroup. The first player is the group leader. - - References: - 4.3.3 Set Group""" - await self._connection.command(GroupCommands.set_group(player_ids)) - - async def create_group( - self, leader_player_id: int, member_player_ids: Sequence[int] - ) -> None: - """Create a HEOS group. - - Args: - leader_player_id: The player_id of the lead player in the group. - member_player_ids: The player_ids of the group members. - - References: - 4.3.3 Set Group""" - player_ids = [leader_player_id] - player_ids.extend(member_player_ids) - await self.set_group(player_ids) - - async def remove_group(self, group_id: int) -> None: - """Ungroup the specified group. - - Args: - group_id: The identifier of the group to ungroup. Must be the lead player. - - References: - 4.3.3 Set Group - """ - await self.set_group([group_id]) - - async def update_group( - self, group_id: int, member_player_ids: Sequence[int] - ) -> None: - """Update the membership of a group. - - Args: - group_id: The identifier of the group to update (same as the lead player_id) - member_player_ids: The new player_ids of the group members. - """ - await self.create_group(group_id, member_player_ids) - - async def get_group_volume(self, group_id: int) -> int: - """ - Get the volume of a group. - - References: - 4.3.4 Get Group Volume - """ - result = await self._connection.command( - GroupCommands.get_group_volume(group_id) - ) - return result.get_message_value_int(c.ATTR_LEVEL) - - async def set_group_volume(self, group_id: int, level: int) -> None: - """Set the volume of the group. - - References: - 4.3.5 Set Group Volume""" - await self._connection.command(GroupCommands.set_group_volume(group_id, level)) - - async def group_volume_up( - self, group_id: int, step: int = const.DEFAULT_STEP - ) -> None: - """Increase the volume level. - - References: - 4.3.6 Group Volume Up""" - await self._connection.command(GroupCommands.group_volume_up(group_id, step)) - - async def group_volume_down( - self, group_id: int, step: int = const.DEFAULT_STEP - ) -> None: - """Increase the volume level. - - References: - 4.2.7 Group Volume Down""" - await self._connection.command(GroupCommands.group_volume_down(group_id, step)) - - async def get_group_mute(self, group_id: int) -> bool: - """Get the mute status of the group. - - References: - 4.3.8 Get Group Mute""" - result = await self._connection.command(GroupCommands.get_group_mute(group_id)) - return result.get_message_value(c.ATTR_STATE) == c.VALUE_ON - - async def group_set_mute(self, group_id: int, state: bool) -> None: - """Set the mute state of the group. - - References: - 4.3.9 Set Group Mute""" - await self._connection.command(GroupCommands.group_set_mute(group_id, state)) - - async def group_toggle_mute(self, group_id: int) -> None: - """Toggle the mute state. - - References: - 4.3.10 Toggle Group Mute""" - await self._connection.command(GroupCommands.group_toggle_mute(group_id)) - - -class Heos(SystemMixin, BrowseMixin, GroupMixin, PlayerMixin): +class Heos(SystemCommands, BrowseCommands, GroupCommands, PlayerCommands): """The Heos class provides access to the CLI API.""" @classmethod diff --git a/pyheos/options.py b/pyheos/options.py new file mode 100644 index 0000000..26ec39c --- /dev/null +++ b/pyheos/options.py @@ -0,0 +1,42 @@ +"""Define the options module.""" + +from dataclasses import dataclass, field + +from pyheos import const +from pyheos.credentials import Credentials +from pyheos.dispatch import Dispatcher + + +@dataclass(frozen=True) +class HeosOptions: + """ + The HeosOptions encapsulates options for connecting to a Heos System. + + Args: + host: A host name or IP address of a HEOS-capable device. + timeout: The timeout in seconds for opening a connectoin and issuing commands to the device. + events: Set to True to enable event updates, False to disable. The default is True. + heart_beat: Set to True to enable heart beat messages, False to disable. Used in conjunction with heart_beat_delay. The default is True. + heart_beat_interval: The interval in seconds between heart beat messages. Used in conjunction with heart_beat. + all_progress_events: Set to True to receive media progress events, False to only receive media changed events. The default is True. + dispatcher: The dispatcher instance to use for event callbacks. If not provided, an internally created instance will be used. + auto_reconnect: Set to True to automatically reconnect if the connection is lost. The default is False. Used in conjunction with auto_reconnect_delay. + auto_reconnect_delay: The delay in seconds before attempting to reconnect. The default is 10 seconds. Used in conjunction with auto_reconnect. + credentials: credentials to use to automatically sign-in to the HEOS account upon successful connection. If not provided, the account will not be signed in. + """ + + host: str + timeout: float = field(default=const.DEFAULT_TIMEOUT, kw_only=True) + events: bool = field(default=True, kw_only=True) + all_progress_events: bool = field(default=True, kw_only=True) + dispatcher: Dispatcher | None = field(default=None, kw_only=True) + auto_reconnect: bool = field(default=False, kw_only=True) + auto_reconnect_delay: float = field( + default=const.DEFAULT_RECONNECT_DELAY, kw_only=True + ) + auto_reconnect_max_attempts: int = field( + default=const.DEFAULT_RECONNECT_ATTEMPTS, kw_only=True + ) + heart_beat: bool = field(default=True, kw_only=True) + heart_beat_interval: float = field(default=const.DEFAULT_HEART_BEAT, kw_only=True) + credentials: Credentials | None = field(default=None, kw_only=True) diff --git a/pyheos/player.py b/pyheos/player.py index a3a7a7c..2b5c5f0 100644 --- a/pyheos/player.py +++ b/pyheos/player.py @@ -81,6 +81,21 @@ } +@dataclass +class PlayerUpdateResult: + """Define the result of refreshing players. + + Args: + added_player_ids: The list of player identifiers that have been added. + removed_player_ids: The list of player identifiers that have been removed. + updated_player_ids: A dictionary that maps the previous player_id to the updated player_id + """ + + added_player_ids: list[int] = field(default_factory=list) + removed_player_ids: list[int] = field(default_factory=list) + updated_player_ids: dict[int, int] = field(default_factory=dict) + + @dataclass class HeosNowPlayingMedia: """Define now playing media information."""