diff --git a/CHANGELOG.md b/CHANGELOG.md index 70bab64..d2a69b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,12 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Changed +- The `run` command now catches syntax errors in the input file. ([pybricksdev#126]) - Changed `pybricksdev.compile.compile_multi_file()` to use `mpy-tool` to find imports instead of Python's `ModuleFinder`. ### Fixed - Fixed compiling multi-file projects with implicit namespace packages. +[pybricksdev#126]: https://github.com/pybricks/pybricksdev/pull/126 + ## [2.3.0] - 2025-10-31 ### Added diff --git a/pybricksdev/cli/__init__.py b/pybricksdev/cli/__init__.py index 8f4a1e4..5c73be1 100644 --- a/pybricksdev/cli/__init__.py +++ b/pybricksdev/cli/__init__.py @@ -8,6 +8,7 @@ import contextlib import logging import os +import subprocess import sys from abc import ABC, abstractmethod from enum import IntEnum @@ -25,6 +26,7 @@ from pybricksdev.connections.pybricks import ( HubDisconnectError, HubPowerButtonPressedError, + PybricksHub, ) PROG_NAME = ( @@ -182,6 +184,172 @@ def add_parser(self, subparsers: argparse._SubParsersAction): default=False, ) + async def stay_connected_menu(self, hub: PybricksHub, args: argparse.Namespace): + + if args.conntype == "ble": + from pybricksdev.ble import find_device as find_ble + from pybricksdev.connections.pybricks import PybricksHubBLE + else: + from usb.core import find as find_usb + + from pybricksdev.connections.pybricks import PybricksHubUSB + from pybricksdev.usb import ( + EV3_USB_PID, + LEGO_USB_VID, + MINDSTORMS_INVENTOR_USB_PID, + NXT_USB_PID, + SPIKE_ESSENTIAL_USB_PID, + SPIKE_PRIME_USB_PID, + ) + + def is_pybricks_usb(dev): + return ( + (dev.idVendor == LEGO_USB_VID) + and ( + dev.idProduct + in [ + NXT_USB_PID, + EV3_USB_PID, + SPIKE_PRIME_USB_PID, + SPIKE_ESSENTIAL_USB_PID, + MINDSTORMS_INVENTOR_USB_PID, + ] + ) + and dev.product.endswith("Pybricks") + ) + + class ResponseOptions(IntEnum): + RECOMPILE_RUN = 0 + RECOMPILE_DOWNLOAD = 1 + RUN_STORED = 2 + CHANGE_TARGET_FILE = 3 + EXIT = 4 + + async def reconnect_hub(): + if not await questionary.confirm( + "\nThe hub has been disconnected. Would you like to re-connect?" + ).ask_async(): + exit() + + if args.conntype == "ble": + print( + f"Searching for {args.name or 'any hub with Pybricks service'}..." + ) + device_or_address = await find_ble(args.name) + hub = PybricksHubBLE(device_or_address) + elif args.conntype == "usb": + device_or_address = find_usb(custom_match=is_pybricks_usb) + hub = PybricksHubUSB(device_or_address) + + await hub.connect() + # re-enable echoing of the hub's stdout + hub._enable_line_handler = True + hub.print_output = True + return hub + + response_options = [ + "Recompile and Run", + "Recompile and Download", + "Run Stored Program", + "Change Target File", + "Exit", + ] + # the entry that is selected by default when the menu opens + # this is overridden after the user picks an option + # so that the default option is always the one that was last chosen + default_response_option = ( + ResponseOptions.RECOMPILE_RUN + if args.start + else ResponseOptions.RECOMPILE_DOWNLOAD + ) + + while True: + try: + if args.file is sys.stdin: + await hub.race_disconnect( + hub.race_power_button_press( + questionary.press_any_key_to_continue( + "The hub will stay connected and echo its output to the terminal. Press any key to exit." + ).ask_async() + ) + ) + return + response = await hub.race_disconnect( + hub.race_power_button_press( + questionary.select( + f"Would you like to re-compile {os.path.basename(args.file.name)}?", + response_options, + default=(response_options[default_response_option]), + ).ask_async() + ) + ) + + default_response_option = response_options.index(response) + + match response_options.index(response): + + case ResponseOptions.RECOMPILE_RUN: + with _get_script_path(args.file) as script_path: + await hub.run(script_path, wait=True) + + case ResponseOptions.RECOMPILE_DOWNLOAD: + with _get_script_path(args.file) as script_path: + await hub.download(script_path) + + case ResponseOptions.RUN_STORED: + if hub.fw_version < Version("3.2.0-beta.4"): + print( + "Running a stored program remotely is only supported in the hub firmware version >= v3.2.0." + ) + else: + await hub.start_user_program() + await hub._wait_for_user_program_stop() + + case ResponseOptions.CHANGE_TARGET_FILE: + args.file.close() + while True: + try: + args.file = open( + await hub.race_disconnect( + hub.race_power_button_press( + questionary.path( + "What file would you like to use?" + ).ask_async() + ) + ), + encoding="utf-8", + ) + break + except FileNotFoundError: + print("The file was not found. Please try again.") + # send the new target file to the hub + with _get_script_path(args.file) as script_path: + await hub.download(script_path) + + case _: + return + + except subprocess.CalledProcessError as e: + print() + print("A syntax error occurred while parsing your program:") + print(e.stderr.decode()) + + except HubPowerButtonPressedError: + # This means the user pressed the button on the hub to re-start the + # current program, so the menu was canceled and we are now printing + # the hub stdout until the user program ends on the hub. + try: + await hub._wait_for_power_button_release() + await hub._wait_for_user_program_stop() + + except HubDisconnectError: + hub = await reconnect_hub() + + except HubDisconnectError: + # let terminal cool off before making a new prompt + await asyncio.sleep(0.3) + hub = await reconnect_hub() + async def run(self, args: argparse.Namespace): # Pick the right connection @@ -246,134 +414,15 @@ def is_pybricks_usb(dev): hub._enable_line_handler = True await hub.download(script_path) - if not args.stay_connected: - return - - class ResponseOptions(IntEnum): - RECOMPILE_RUN = 0 - RECOMPILE_DOWNLOAD = 1 - RUN_STORED = 2 - CHANGE_TARGET_FILE = 3 - EXIT = 4 - - async def reconnect_hub(): - if not await questionary.confirm( - "\nThe hub has been disconnected. Would you like to re-connect?" - ).ask_async(): - exit() - - if args.conntype == "ble": - print( - f"Searching for {args.name or 'any hub with Pybricks service'}..." - ) - device_or_address = await find_ble(args.name) - hub = PybricksHubBLE(device_or_address) - elif args.conntype == "usb": - device_or_address = find_usb(custom_match=is_pybricks_usb) - hub = PybricksHubUSB(device_or_address) - - await hub.connect() - # re-enable echoing of the hub's stdout - hub._enable_line_handler = True - hub.print_output = True - return hub - - response_options = [ - "Recompile and Run", - "Recompile and Download", - "Run Stored Program", - "Change Target File", - "Exit", - ] - # the entry that is selected by default when the menu opens - # this is overridden after the user picks an option - # so that the default option is always the one that was last chosen - default_response_option = ( - ResponseOptions.RECOMPILE_RUN - if args.start - else ResponseOptions.RECOMPILE_DOWNLOAD - ) - - while True: - try: - if args.file is sys.stdin: - await hub.race_disconnect( - hub.race_power_button_press( - questionary.press_any_key_to_continue( - "The hub will stay connected and echo its output to the terminal. Press any key to exit." - ).ask_async() - ) - ) - return - response = await hub.race_disconnect( - hub.race_power_button_press( - questionary.select( - f"Would you like to re-compile {os.path.basename(args.file.name)}?", - response_options, - default=(response_options[default_response_option]), - ).ask_async() - ) - ) - - default_response_option = response_options.index(response) - - match response_options.index(response): - - case ResponseOptions.RECOMPILE_RUN: - with _get_script_path(args.file) as script_path: - await hub.run(script_path, wait=True) - - case ResponseOptions.RECOMPILE_DOWNLOAD: - with _get_script_path(args.file) as script_path: - await hub.download(script_path) + if args.stay_connected: + await self.stay_connected_menu(hub, args) - case ResponseOptions.RUN_STORED: - if hub.fw_version < Version("3.2.0-beta.4"): - print( - "Running a stored program remotely is only supported in the hub firmware version >= v3.2.0." - ) - else: - await hub.start_user_program() - await hub._wait_for_user_program_stop() - - case ResponseOptions.CHANGE_TARGET_FILE: - args.file.close() - while True: - try: - args.file = open( - await hub.race_disconnect( - hub.race_power_button_press( - questionary.path( - "What file would you like to use?" - ).ask_async() - ) - ) - ) - break - except FileNotFoundError: - print("The file was not found. Please try again.") - # send the new target file to the hub - with _get_script_path(args.file) as script_path: - await hub.download(script_path) - - case _: - return - - except HubPowerButtonPressedError: - # This means the user pressed the button on the hub to re-start the - # current program, so the menu was canceled and we are now printing - # the hub stdout until the user program ends on the hub. - try: - await hub._wait_for_power_button_release() - await hub._wait_for_user_program_stop() - - except HubDisconnectError: - hub = await reconnect_hub() - - except HubDisconnectError: - # let terminal cool off before making a new prompt - await asyncio.sleep(0.3) - hub = await reconnect_hub() + except subprocess.CalledProcessError as e: + print() + print("A syntax error occurred while parsing your program:") + print(e.stderr.decode()) + if args.stay_connected: + await self.stay_connected_menu(hub, args) finally: await hub.disconnect() diff --git a/tests/test_cli.py b/tests/test_cli.py index 51d58cd..39e6746 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,15 +1,22 @@ """Tests for the pybricksdev CLI commands.""" import argparse +import asyncio import contextlib import io import os +import subprocess import tempfile from unittest.mock import AsyncMock, Mock, mock_open, patch import pytest +from packaging.version import Version from pybricksdev.cli import Compile, Run, Tool, Udev +from pybricksdev.connections.pybricks import ( + HubDisconnectError, + HubPowerButtonPressedError, +) class TestTool: @@ -442,6 +449,391 @@ async def test_run_connection_error(self): # Verify disconnect was not called since connection failed mock_hub.disconnect.assert_not_called() + @pytest.mark.asyncio + async def test_run_syntax_error(self): + """Test that the stay connected menu is called upon a syntax error when the appropriate flag is active.""" + + # Create a mock hub + mock_hub = AsyncMock() + mock_hub.run = AsyncMock( + side_effect=subprocess.CalledProcessError( + returncode=1, cmd="test", stderr=b"test" + ) + ) + mock_hub.connect = AsyncMock() + + # Set up mocks using ExitStack + with contextlib.ExitStack() as stack: + # Create and manage temporary file + temp = stack.enter_context( + tempfile.NamedTemporaryFile( + suffix=".py", mode="w+", delete=False, encoding="utf-8" + ) + ) + temp.write("print('test')") + temp_path = temp.name + stack.callback(os.unlink, temp_path) + + # Create args + args = argparse.Namespace( + conntype="ble", + file=stack.enter_context(open(temp_path, "r", encoding="utf-8")), + name="MyHub", + start=True, + wait=True, + stay_connected=True, + ) + + mock_hub_class = stack.enter_context( + patch( + "pybricksdev.connections.pybricks.PybricksHubBLE", + return_value=mock_hub, + ) + ) + stack.enter_context( + patch("pybricksdev.ble.find_device", return_value="mock_device") + ) + mock_menu = stack.enter_context( + patch("pybricksdev.cli.Run.stay_connected_menu") + ) + + # Run the command + run_cmd = Run() + await run_cmd.run(args) + + # Verify the hub was created and used correctly + mock_hub_class.assert_called_once_with("mock_device") + mock_hub.connect.assert_called_once() + mock_hub.run.assert_called_once_with(temp_path, True) + mock_menu.assert_called_once_with(mock_hub, args) + mock_hub.disconnect.assert_called_once() + + @pytest.mark.asyncio + async def test_stay_connected_menu_integration(self): + """Test all of the basic options in the stay_connected menu.""" + + async def passthrough_awaitable(awaitable): + return await awaitable + + # Create a mock hub + mock_hub = AsyncMock() + mock_hub.fw_version = Version("3.2.0-beta.4") + mock_hub.run = AsyncMock() + mock_hub.connect = AsyncMock() + mock_hub.start_user_program = AsyncMock() + mock_hub._wait_for_user_program_stop = AsyncMock() + mock_hub.race_disconnect = mock_hub.race_power_button_press = AsyncMock( + side_effect=passthrough_awaitable + ) + mock_hub.download = AsyncMock() + + # create a mock questionary menu + mock_selector = AsyncMock() + mock_selector.ask_async.side_effect = [ + "Recompile and Run", + "Recompile and Download", + "Run Stored Program", + "Exit", + ] + + # Set up mocks using ExitStack + with contextlib.ExitStack() as stack: + # Create and manage temporary file + temp = stack.enter_context( + tempfile.NamedTemporaryFile( + suffix=".py", mode="w+", delete=False, encoding="utf-8" + ) + ) + temp.write("print('test')") + temp_path = temp.name + stack.callback(os.unlink, temp_path) + + # Create args + args = argparse.Namespace( + conntype="ble", + file=stack.enter_context(open(temp_path, "r", encoding="utf-8")), + name="MyHub", + start=True, + wait=True, + stay_connected=True, + ) + + mock_hub_class = stack.enter_context( + patch( + "pybricksdev.connections.pybricks.PybricksHubBLE", + return_value=mock_hub, + ) + ) + stack.enter_context( + patch("pybricksdev.ble.find_device", return_value="mock_device") + ) + mock_selector = stack.enter_context( + patch("questionary.select", return_value=mock_selector) + ) + + # Run the command + run_cmd = Run() + await run_cmd.run(args) + + # Verify the hub was created and used correctly + mock_hub_class.assert_called_once_with("mock_device") + mock_hub.connect.assert_called_once() + + assert mock_hub.run.call_count == 2 + mock_hub.run.assert_called_with(temp_path, wait=True) + mock_hub.download.assert_called_once_with(temp_path) + mock_hub.start_user_program.assert_called_once() + mock_hub._wait_for_user_program_stop.assert_called_once() + + assert mock_selector.call_count == 4 + mock_hub.disconnect.assert_called_once() + + @pytest.mark.asyncio + async def test_stay_connected_menu_change_target_file(self): + """Test the change target file option.""" + + async def passthrough_awaitable(awaitable): + return await awaitable + + # Create a mock hub + mock_hub = AsyncMock() + mock_hub.run = AsyncMock() + mock_hub.connect = AsyncMock() + mock_hub.race_disconnect = mock_hub.race_power_button_press = AsyncMock( + side_effect=passthrough_awaitable + ) + mock_hub.download = AsyncMock() + + # create a mock questionary menu + mock_menu = AsyncMock() + mock_menu.ask_async.side_effect = [ + "Change Target File", + "Recompile and Run", + "Exit", + ] + + # Set up mocks using ExitStack + with contextlib.ExitStack() as stack: + # Create and manage temporary file + temp = stack.enter_context( + tempfile.NamedTemporaryFile( + suffix=".py", mode="w+", delete=False, encoding="utf-8" + ) + ) + new_target_file = stack.enter_context( + tempfile.NamedTemporaryFile( + suffix=".py", mode="w+", delete=False, encoding="utf-8" + ) + ) + temp.write("print('test')") + temp_path = temp.name + stack.callback(os.unlink, temp_path) + new_target_file.write("print('test')") + new_path = new_target_file.name + stack.callback(os.unlink, new_path) + + # Create args + args = argparse.Namespace( + conntype="ble", + file=stack.enter_context(open(temp_path, "r", encoding="utf-8")), + name="MyHub", + start=True, + wait=True, + stay_connected=True, + ) + + stack.enter_context( + patch("pybricksdev.ble.find_device", return_value="mock_device") + ) + mock_selector = stack.enter_context( + patch("questionary.select", return_value=mock_menu) + ) + mock_file_selector = stack.enter_context(patch("questionary.path")) + path_obj = AsyncMock() + path_obj.ask_async = AsyncMock(return_value=new_path) + mock_file_selector.return_value = path_obj + + # Run the command + run_cmd = Run() + await run_cmd.stay_connected_menu(mock_hub, args) + + assert mock_selector.call_count == 3 + mock_file_selector.assert_called_once() + mock_hub.download.assert_called_once_with(new_path) + mock_hub.run.assert_called_once_with(new_path, wait=True) + + @pytest.mark.asyncio + async def test_stay_connected_menu_run_stored(self): + """Test that the run_stored program option doesn't call an inaccessible method on an old hub.""" + + async def passthrough_awaitable(awaitable): + return await awaitable + + # Create a mock hub + old_mock_hub = AsyncMock() + + # simulate an old hub that can't handle the run_stored_program option + old_mock_hub.fw_version = Version("3.2.0-beta.3") + old_mock_hub.run = AsyncMock() + old_mock_hub.connect = AsyncMock() + old_mock_hub.start_user_program = AsyncMock() + old_mock_hub.race_disconnect = old_mock_hub.race_power_button_press = AsyncMock( + side_effect=passthrough_awaitable + ) + + # create a mock questionary menu + mock_selector = AsyncMock() + mock_selector.ask_async.side_effect = [ + "Run Stored Program", + "Exit", + ] + + # Set up mocks using ExitStack + with contextlib.ExitStack() as stack: + # Create and manage temporary file + + # Create args + args = argparse.Namespace( + conntype="ble", + file=stack.enter_context( + tempfile.NamedTemporaryFile( + suffix=".py", mode="w+", delete=False, encoding="utf-8" + ) + ), + name="MyHub", + start=True, + wait=True, + stay_connected=True, + ) + + stack.enter_context(patch("questionary.select", return_value=mock_selector)) + + # Run the command + run_cmd = Run() + await run_cmd.stay_connected_menu(old_mock_hub, args) + + old_mock_hub.start_user_program.assert_not_called() + old_mock_hub._wait_for_user_program_stop.assert_not_called() + + @pytest.mark.asyncio + async def test_stay_connected_menu_interruptions(self): + """Test the stay_connected_menu being interrupted by a power button press or hub disconnect.""" + mock_menu_call_count = 0 + + # simulates the hub disconnecting on the first call, + async def mock_race_disconnect(awaitable): + task = asyncio.create_task(awaitable) + try: + if mock_menu_call_count == 1: + await asyncio.sleep(0.01) + task.cancel() + raise HubDisconnectError("hub disconnected") + return await task + except BaseException: + await asyncio.sleep(0.01) + task.cancel() + raise + + # simulate the power button being pressed on the second call + async def mock_race_power_button_press(awaitable): + task = asyncio.create_task(awaitable) + try: + if mock_menu_call_count == 2: + await asyncio.sleep(0.01) + task.cancel() + raise HubPowerButtonPressedError("power button pressed") + return await task + except BaseException: + await asyncio.sleep(0.01) + task.cancel() + raise + + # should be called but cancelled twice, returning "Recompile and Run" the third time + async def mock_menu_function(): + nonlocal mock_menu_call_count + mock_menu_call_count += 1 + if mock_menu_call_count <= 3: + return "Recompile and Run" + else: + return "Exit" + + # Create a mock hub + mock_hub = AsyncMock() + mock_hub.run = AsyncMock() + mock_hub.connect = AsyncMock() + mock_hub.disconnect = AsyncMock() + mock_hub.race_disconnect = AsyncMock( + side_effect=mock_race_disconnect, + ) + mock_hub.race_power_button_press = AsyncMock( + side_effect=mock_race_power_button_press, + ) + mock_hub._wait_for_power_button_release = AsyncMock() + mock_hub._wait_for_user_program_stop = AsyncMock() + # create a mock questionary menu + mock_menu = AsyncMock() + mock_menu.ask_async.side_effect = mock_menu_function + + # create a mock confirmation menu to reconnect to the hub + mock_confirm_menu = AsyncMock() + mock_confirm_menu.ask_async.return_value = True + + # Set up mocks using ExitStack + with contextlib.ExitStack() as stack: + # Create and manage temporary file + temp = stack.enter_context( + tempfile.NamedTemporaryFile( + suffix=".py", mode="w+", delete=False, encoding="utf-8" + ) + ) + temp.write("print('test')") + temp_path = temp.name + stack.callback(os.unlink, temp_path) + + # Create args + args = argparse.Namespace( + conntype="ble", + file=stack.enter_context(open(temp_path, "r", encoding="utf-8")), + name="MyHub", + start=True, + wait=True, + stay_connected=True, + ) + + mock_hub_class = stack.enter_context( + patch( + "pybricksdev.connections.pybricks.PybricksHubBLE", + return_value=mock_hub, + ) + ) + + stack.enter_context( + patch("pybricksdev.ble.find_device", return_value="mock_device") + ) + mock_selector = stack.enter_context( + patch("questionary.select", return_value=mock_menu) + ) + mock_confirm = stack.enter_context( + patch("questionary.confirm", return_value=mock_confirm_menu) + ) + + # Run the command + run_cmd = Run() + await run_cmd.stay_connected_menu(mock_hub, args) + + assert mock_selector.call_count == 4 + # a confirmation menu should be triggered and the hub should be re-instantiated upon a HubDisconnectError + mock_confirm.assert_called_once() + mock_hub_class.assert_called_once() + mock_hub.connect.assert_called_once() + + # these functions should be triggered upon a HubPowerButtonPressedError + mock_hub._wait_for_power_button_release.assert_called_once() + mock_hub._wait_for_user_program_stop.assert_called_once() + + # this should only be called once because the menu was canceled the first two times it was called + mock_hub.run.assert_called_once_with(temp_path, wait=True) + class TestCompile: """Tests for the Compile command."""