diff --git a/poetry.lock b/poetry.lock index 5120e95..01cba95 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,5 +1,16 @@ # This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +[[package]] +name = "asynctest" +version = "0.13.0" +description = "Enhance the standard unittest package with features for testing asyncio libraries" +optional = false +python-versions = ">=3.5" +files = [ + {file = "asynctest-0.13.0-py3-none-any.whl", hash = "sha256:5da6118a7e6d6b54d83a8f7197769d046922a44d2a99c21382f0a6e4fadae676"}, + {file = "asynctest-0.13.0.tar.gz", hash = "sha256:c27862842d15d83e6a34eb0b2866c323880eb3a75e4485b079ea11748fd77fac"}, +] + [[package]] name = "pillow" version = "10.1.0" @@ -67,31 +78,7 @@ files = [ docs = ["furo", "olefile", "sphinx (>=2.4)", "sphinx-copybutton", "sphinx-inline-tabs", "sphinx-removed-in", "sphinxext-opengraph"] tests = ["check-manifest", "coverage", "defusedxml", "markdown2", "olefile", "packaging", "pyroma", "pytest", "pytest-cov", "pytest-timeout"] -[[package]] -name = "winsdk" -version = "1.0.0b10" -description = "Python bindings for the Windows SDK" -optional = false -python-versions = ">=3.8" -files = [ - {file = "winsdk-1.0.0b10-cp310-cp310-win32.whl", hash = "sha256:90f75c67e166d588a045bcde0117a4631c705904f7af4ac42644479dcf0d8c52"}, - {file = "winsdk-1.0.0b10-cp310-cp310-win_amd64.whl", hash = "sha256:c3be3fbf692b8888bac8c0712c490c080ab8976649ef01f9f6365947f4e5a8b1"}, - {file = "winsdk-1.0.0b10-cp310-cp310-win_arm64.whl", hash = "sha256:6ab69dd65d959d94939c21974a33f4f1dfa625106c8784435ecacbd8ff0bf74d"}, - {file = "winsdk-1.0.0b10-cp311-cp311-win32.whl", hash = "sha256:9ea4fdad9ca8a542198aee3c753ac164b8e2f550d760bb88815095d64750e0f5"}, - {file = "winsdk-1.0.0b10-cp311-cp311-win_amd64.whl", hash = "sha256:f12e25bbf0a658270203615677520b8170edf500fba11e0f80359c5dbf090676"}, - {file = "winsdk-1.0.0b10-cp311-cp311-win_arm64.whl", hash = "sha256:e77bce44a9ff151562bd261b2a1a8255e258bb10696d0d31ef63267a27628af1"}, - {file = "winsdk-1.0.0b10-cp312-cp312-win32.whl", hash = "sha256:775a55a71e05ec2aa262c1fd67d80f270d4186bbdbbee2f43c9c412cf76f0761"}, - {file = "winsdk-1.0.0b10-cp312-cp312-win_amd64.whl", hash = "sha256:8231ce5f16e1fc88bb7dda0adf35633b5b26101eae3b0799083ca2177f03e4e5"}, - {file = "winsdk-1.0.0b10-cp312-cp312-win_arm64.whl", hash = "sha256:f4ab469ada19b34ccfc69a148090f98b40a1da1da797b50b9cbba0c090c365a5"}, - {file = "winsdk-1.0.0b10-cp38-cp38-win32.whl", hash = "sha256:786d6b50e4fcb8af2d701d7400c74e1c3f3ab7766ed1dfd516cdd6688072ea87"}, - {file = "winsdk-1.0.0b10-cp38-cp38-win_amd64.whl", hash = "sha256:1d4fdd1f79b41b64fedfbc478a29112edf2076e1a61001eccb536c0568510e74"}, - {file = "winsdk-1.0.0b10-cp39-cp39-win32.whl", hash = "sha256:4f04d3e50eeb8ca5fe4eb2e39785f3fa594199819acdfb23a10aaef4b97699ad"}, - {file = "winsdk-1.0.0b10-cp39-cp39-win_amd64.whl", hash = "sha256:7948bc5d8a02d73b1db043788d32b2988b8e7e29a25e503c21d34478e630eaf1"}, - {file = "winsdk-1.0.0b10-cp39-cp39-win_arm64.whl", hash = "sha256:342b1095cbd937865cee962676e279a1fd28896a0680724fcf9c65157e7ebdb7"}, - {file = "winsdk-1.0.0b10.tar.gz", hash = "sha256:8f39ea759626797449371f857c9085b84bb9f3b6d493dc6525e2cedcb3d15ea2"}, -] - [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "81802080ea5569915adef40ff4afca0124730ee9311f7862b1c9067818a8b7af" +content-hash = "6d8c6136ea7f6f7ee1abe20260f179dbccbe6a63cdf416fbc925248d4b814861" diff --git a/pyproject.toml b/pyproject.toml index 5a8366d..1b468de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,12 @@ python = "^3.11" pillow = "^10.1.0" winsdk = "^1.0.0b10" +[tool.poetry.dev-dependencies] +asynctest = "^0.13.0" + +[tool.poetry.group.dev.dependencies] +asynctest = "^0.13.0" + [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/tests/maincopy copy.py b/tests/maincopy copy.py new file mode 100644 index 0000000..99032ab --- /dev/null +++ b/tests/maincopy copy.py @@ -0,0 +1,172 @@ +import sys +sys.path.append('.') +sys.path.append('..') +import py_now_playing +import asyncio +from pypresence import Presence +import pypresence.exceptions +from pypresence.exceptions import InvalidID +import time +import multiprocessing +import logging +import os +import sys +import traceback +import re +import requests +# Set up logging +#logging.basicConfig(level=logging.NOTSET) +logging.basicConfig(level=logging.INFO) +# Set up logging to a file +logging.basicConfig(filename='C:/Users/buckn/OneDrive/Documents/py-now-playing/tests/app.log', level=logging.INFO) + +# Disable logging to consile with sys +#sys.stdout = open(os.devnull, 'w') +#sys.stderr = open(os.devnull, 'w') +#logging = logging.getlogging(__name__) + + +def get_album_art(artist, title): + # adapted from https://github.com/NextFire/apple-music-discord-rpc/blob/main/music-rpc.ts#L186 + # Uses https://musicbrainz.org/ + # Uses https://coverartarchive.org/ + def lucene_escape(term): + return re.sub(r'([+\-&|!(){}\[\]^"~*?:\\])', r'\\\1', term) + + def remove_parentheses_content(term): + return re.sub(r'\([^)]*\)', '', term).strip() + query_terms = [] + MB_EXCLUDED_NAMES = ['Various Artists', 'Various', 'Unknown Artist', 'Unknown'] + if not all(elem in artist for elem in MB_EXCLUDED_NAMES): + query_terms.append(f'artist:"{lucene_escape(remove_parentheses_content(artist))}"') + if title is not None: + query_terms.append(f'recording:"{lucene_escape(remove_parentheses_content(title))}"') + query = " ".join(query_terms) + + params = { + 'fmt': 'json', + 'limit': '10', + 'query': query, + } + + resp = requests.get('https://musicbrainz.org/ws/2/release', params=params) + json = resp.json() + + for release in json['releases']: + resp = requests.get(f'https://coverartarchive.org/release/{release["id"]}/front') + if resp.status_code == 200: + print(resp.url) + return resp.url + return None + +def start_rpc(client_id, now_playing_queue): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + rpc = Presence(client_id) + + def connect_rpc(): + while True: + try: + logging.info("try to connect") + rpc.connect() + logging.info("Connected to Discord RPC") + break + except Exception as e: + logging.error(f"Failed to connect to Discord RPC: {e}") + traceback.print_exc() + time.sleep(15) + + # Call connect_rpc directly + connect_rpc() + + while True: + try: + now_playing_list = now_playing_queue.get() + logging.info(now_playing_list) + if now_playing_list: + print("retrieved now_playing_list" + str(now_playing_list)) + rpc.update( + details=now_playing_list['title'] or "Unknown Song", + state='by ' + now_playing_list['artist'] if now_playing_list['artist'] is not None else 'by Unknown Artist', + large_image=get_album_art(now_playing_list['artist'],now_playing_list['title']), # Replace with your image key + large_text='Amazon Music', + ) + else: + logging.info("No Music Playing") + rpc.clear() + except (pypresence.exceptions.DiscordNotFound, pypresence.exceptions.InvalidPipe, pypresence.PipeClosed) as e: + connect_rpc() + except BrokenPipeError or EOFError as f: + logging.error(f"BrokenPipeError: {f}") + traceback.print_exc() + except EOFError or UnboundLocalError as g: + logging.error(f"EOFError: {g}") + traceback.print_exc() + + time.sleep(5) + + +async def main(): + np = py_now_playing.NowPlaying() + await np.initalize_mediamanger() + + client_id = '1187213553673965619' # Replace with your client ID + manager = multiprocessing.Manager() + now_playing_queue = manager.Queue() + + # Start the Discord RPC in a separate process + rpc_process = multiprocessing.Process(target=start_rpc, args=(client_id, now_playing_queue)) + rpc_process.start() + + try: + while True: + now_playing = await np.get_active_app_user_model_ids() + print (now_playing) + #now_playing = list(filter(lambda app: app['Name'] == 'Amazon Music', now_playing)) + # AppID Specifically I'm looking for is Chrome._crx_mnlencgjbmniianjkpemfocoke + now_playing = list(filter(lambda app: app['AppID'] == 'Chrome._crx_mnlencgjbmniianjkpemfocoke', now_playing)) + if not now_playing: + now_playing_queue.put(None) + await asyncio.sleep(5) + continue + + now_playing_appid = now_playing[0]['AppID'] + try: + data = await np.get_now_playing(now_playing_appid) + # if 'thumbnail' in data: + # del data['thumbnail'] + + # Turn thumbnail into a photo using async def thumbnail_to_image(self, thumbnail): + if 'thumbnail' in data: + data['thumbnail'] = await np.thumbnail_to_image(data['thumbnail']) + + except PermissionError as permerr: + # logging.error("PermissionError: ") + # traceback.print_exc() + # data = None + pass + now_playing_queue.put(data) + logging.info("A Song is Playing, Here is the Json for that: " + str(data)) + await asyncio.sleep(5) + except KeyboardInterrupt: + logging.info("Interrupted by user, stopping processes...") + rpc_process.terminate() # Terminate the rpc_process + # Kill the event loop + asyncio.get_event_loop().stop() + except OSError as e: + logging.error(f"OSError {e}") + + traceback.print_exc() + now_playing_queue.put(None) + except Exception as e: + logging.error(f"Unexpected error in main: {e}") + traceback.print_exc() + pass + +if __name__ == '__main__': + try: + asyncio.run(main()) + except KeyboardInterrupt: + logging.info("Interrupted by user, caught in if, exiting...") + sys.exit(0) \ No newline at end of file diff --git a/tests/maincopy.pyw b/tests/maincopy.pyw index 41ccf63..05aad06 100644 --- a/tests/maincopy.pyw +++ b/tests/maincopy.pyw @@ -17,12 +17,12 @@ import traceback #logging.basicConfig(level=logging.NOTSET) logging.basicConfig(level=logging.INFO) # Set up logging to a file -logging.basicConfig(filename='app.log', level=logging.INFO) +logging.basicConfig(filename='C:/Users/buckn/OneDrive/Documents/py-now-playing/tests/app.log', level=logging.INFO) # Disable logging to consile with sys #sys.stdout = open(os.devnull, 'w') #sys.stderr = open(os.devnull, 'w') -logger = logging.getLogger(__name__) +#logging = logging.getlogging(__name__) def start_rpc(client_id, now_playing_queue): loop = asyncio.new_event_loop() @@ -33,12 +33,12 @@ def start_rpc(client_id, now_playing_queue): def connect_rpc(): while True: try: - logger.info("try to connect") + logging.info("try to connect") rpc.connect() - logger.info("Connected to Discord RPC") + logging.info("Connected to Discord RPC") break except Exception as e: - logger.error(f"Failed to connect to Discord RPC: {e}") + logging.error(f"Failed to connect to Discord RPC: {e}") traceback.print_exc() time.sleep(15) @@ -48,7 +48,7 @@ def start_rpc(client_id, now_playing_queue): while True: try: now_playing_list = now_playing_queue.get() - logger.info(now_playing_list) + logging.info(now_playing_list) if now_playing_list: rpc.update( details=now_playing_list['title'] or "Unknown Song", @@ -57,15 +57,15 @@ def start_rpc(client_id, now_playing_queue): large_text='Amazon Music', ) else: - logger.info("No Music Playing") + logging.info("No Music Playing") rpc.clear() except (pypresence.exceptions.DiscordNotFound, pypresence.exceptions.InvalidPipe, pypresence.PipeClosed) as e: connect_rpc() except BrokenPipeError or EOFError as f: - logger.error(f"BrokenPipeError: {f}") + logging.error(f"BrokenPipeError: {f}") traceback.print_exc() except EOFError or UnboundLocalError as g: - logger.error(f"EOFError: {g}") + logging.error(f"EOFError: {g}") traceback.print_exc() time.sleep(5) @@ -86,36 +86,36 @@ async def main(): try: while True: now_playing = await np.get_active_app_user_model_ids() - now_playing = list(filter(lambda app: app['Name'] == 'Amazon Music', now_playing)) + now_playing = list(filter(lambda app: app['AppID'] == 'AmazonMobileLLC.AmazonMusic_kc6t79cpj4tp0!AmazonMobileLLC.AmazonMusic', now_playing)) + print(now_playing) if not now_playing: now_playing_queue.put(None) await asyncio.sleep(5) continue - now_playing_appid = now_playing[0]['AppID'] try: data = await np.get_now_playing(now_playing_appid) except PermissionError as permerr: - logger.error("PermissionError: ") + logging.error("PermissionError: ") traceback.print_exc() data = None pass now_playing_queue.put(data) - logger.info("A Song is Playing, Here is the Json for that: " + str(data)) + logging.info("A Song is Playing, Here is the Json for that: " + str(data)) await asyncio.sleep(5) except KeyboardInterrupt: - logger.info("Interrupted by user, stopping processes...") + logging.info("Interrupted by user, stopping processes...") rpc_process.terminate() # Terminate the rpc_process # Kill the event loop asyncio.get_event_loop().stop() except OSError as e: - logger.error(f"OSError {e}") + logging.error(f"OSError {e}") traceback.print_exc() now_playing_queue.put(None) except Exception as e: - logger.error(f"Unexpected error in main: {e}") + logging.error(f"Unexpected error in main: {e}") traceback.print_exc() pass @@ -123,5 +123,5 @@ if __name__ == '__main__': try: asyncio.run(main()) except KeyboardInterrupt: - logger.info("Interrupted by user, caught in if, exiting...") + logging.info("Interrupted by user, caught in if, exiting...") sys.exit(0) \ No newline at end of file diff --git a/tests/tests.py b/tests/tests.py new file mode 100644 index 0000000..97c8e03 --- /dev/null +++ b/tests/tests.py @@ -0,0 +1,33 @@ +import unittest +import asynctest +from unittest.mock import MagicMock, patch +from py_now_playing import NowPlaying # replace 'your_module' with the name of the module that contains the NowPlaying class + +class TestNowPlaying(asynctest.TestCase): + def setUp(self): + self.now_playing = NowPlaying() + + @asynctest.patch('your_module.MediaManager.request_async') + async def test_initalize_mediamanger(self, mock_request_async): + mock_request_async.return_value = 'mock manager' + await self.now_playing.initalize_mediamanger() + self.assertEqual(self.now_playing._manager, 'mock manager') + + async def test_get_sessions(self): + self.now_playing._manager = MagicMock() + self.now_playing._manager.get_sessions.return_value = ['session1', 'session2'] + sessions = await self.now_playing._get_sessions() + self.assertEqual(sessions, ['session1', 'session2']) + + @asynctest.patch('your_module.subprocess.check_output') + @asynctest.patch('your_module.json.loads') + async def test_get_active_app_user_model_ids(self, mock_loads, mock_check_output): + mock_check_output.return_value = 'mock output' + mock_loads.return_value = ['app1', 'app2'] + self.now_playing._get_app_user_model_ids = asynctest.CoroutineMock() + self.now_playing._get_app_user_model_ids.return_value = ['app2'] + active_app_ids = await self.now_playing.get_active_app_user_model_ids() + self.assertEqual(active_app_ids, ['app2']) + +if __name__ == '__main__': + asynctest.main() \ No newline at end of file