From c60d628e48cac193de582ddb702e53d73cf48aa6 Mon Sep 17 00:00:00 2001 From: Scott Nemes Date: Fri, 9 Jan 2026 19:10:08 -0800 Subject: [PATCH 1/4] Created new data class for handling SQL/command results to make further code improvements easier --- changelog.md | 8 +++ mycli/completion_refresher.py | 7 +- mycli/main.py | 56 +++++++-------- mycli/packages/special/dbcommands.py | 18 ++--- mycli/packages/special/delimitercommand.py | 10 +-- mycli/packages/special/iocommands.py | 81 +++++++++++----------- mycli/packages/special/main.py | 14 ++-- mycli/packages/sqlresult.py | 17 +++++ mycli/sqlexecute.py | 9 +-- test/test_completion_refresher.py | 21 +++--- test/test_main.py | 6 +- test/test_special_iocommands.py | 10 +-- test/test_tabular_output.py | 11 +-- 13 files changed, 150 insertions(+), 118 deletions(-) create mode 100644 mycli/packages/sqlresult.py diff --git a/changelog.md b/changelog.md index 54d1ba7d..ad59f82d 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,11 @@ +Upcoming (TBD) +============== + +Internal +-------- +* Create new data class to handle SQL/command results to make further code improvements easier + + 1.44.0 (2026/01/08) ============== diff --git a/mycli/completion_refresher.py b/mycli/completion_refresher.py index e3eb4984..1b8ffb07 100644 --- a/mycli/completion_refresher.py +++ b/mycli/completion_refresher.py @@ -2,6 +2,7 @@ from typing import Callable from mycli.packages.special.main import COMMANDS +from mycli.packages.sqlresult import SQLResult from mycli.sqlcompleter import SQLCompleter from mycli.sqlexecute import ServerSpecies, SQLExecute @@ -18,7 +19,7 @@ def refresh( executor: SQLExecute, callbacks: Callable | list[Callable], completer_options: dict | None = None, - ) -> list[tuple]: + ) -> list[SQLResult]: """Creates a SQLCompleter object and populates it with the relevant completion suggestions in a background thread. @@ -35,14 +36,14 @@ def refresh( if self.is_refreshing(): self._restart_refresh.set() - return [(None, None, None, "Auto-completion refresh restarted.")] + return [SQLResult(status="Auto-completion refresh restarted.")] else: self._completer_thread = threading.Thread( target=self._bg_refresh, args=(executor, callbacks, completer_options), name="completion_refresh" ) self._completer_thread.daemon = True self._completer_thread.start() - return [(None, None, None, "Auto-completion refresh started in the background.")] + return [SQLResult(status="Auto-completion refresh started in the background.")] def is_refreshing(self) -> bool: return bool(self._completer_thread and self._completer_thread.is_alive()) diff --git a/mycli/main.py b/mycli/main.py index 226252f3..49025273 100755 --- a/mycli/main.py +++ b/mycli/main.py @@ -60,6 +60,7 @@ from mycli.packages.prompt_utils import confirm, confirm_destructive_query from mycli.packages.special.favoritequeries import FavoriteQueries from mycli.packages.special.main import ArgType +from mycli.packages.sqlresult import SQLResult from mycli.packages.tabular_output import sql_format from mycli.packages.toolkit.history import FileHistoryWithTimestamp from mycli.sqlcompleter import SQLCompleter @@ -272,49 +273,49 @@ def register_special_commands(self) -> None: self.change_prompt_format, "prompt", "\\R", "Change prompt format.", aliases=["\\R"], case_sensitive=True ) - def manual_reconnect(self, arg: str = "", **_) -> Generator[tuple, None, None]: + def manual_reconnect(self, arg: str = "", **_) -> Generator[SQLResult, None, None]: """ Interactive method to use for the \r command, so that the utility method may be cleanly used elsewhere. """ if not self.reconnect(database=arg): - yield (None, None, None, "Not connected") + yield SQLResult(status="Not connected") elif not arg or arg == '``': - yield (None, None, None, None) + yield SQLResult() else: yield self.change_db(arg).send(None) - def enable_show_warnings(self, **_) -> Generator[tuple, None, None]: + def enable_show_warnings(self, **_) -> Generator[SQLResult, None, None]: self.show_warnings = True msg = "Show warnings enabled." - yield (None, None, None, msg) + yield SQLResult(status=msg) - def disable_show_warnings(self, **_) -> Generator[tuple, None, None]: + def disable_show_warnings(self, **_) -> Generator[SQLResult, None, None]: self.show_warnings = False msg = "Show warnings disabled." - yield (None, None, None, msg) + yield SQLResult(status=msg) - def change_table_format(self, arg: str, **_) -> Generator[tuple, None, None]: + def change_table_format(self, arg: str, **_) -> Generator[SQLResult, None, None]: try: self.main_formatter.format_name = arg - yield (None, None, None, f"Changed table format to {arg}") + yield SQLResult(status=f"Changed table format to {arg}") except ValueError: msg = f"Table format {arg} not recognized. Allowed formats:" for table_type in self.main_formatter.supported_formats: msg += f"\n\t{table_type}" - yield (None, None, None, msg) + yield SQLResult(status=msg) - def change_redirect_format(self, arg: str, **_) -> Generator[tuple, None, None]: + def change_redirect_format(self, arg: str, **_) -> Generator[SQLResult, None, None]: try: self.redirect_formatter.format_name = arg - yield (None, None, None, f"Changed redirect format to {arg}") + yield SQLResult(status=f"Changed redirect format to {arg}") except ValueError: msg = f"Redirect format {arg} not recognized. Allowed formats:" for table_type in self.redirect_formatter.supported_formats: msg += f"\n\t{table_type}" - yield (None, None, None, msg) + yield SQLResult(status=msg) - def change_db(self, arg: str, **_) -> Generator[tuple, None, None]: + def change_db(self, arg: str, **_) -> Generator[SQLResult, None, None]: if arg.startswith("`") and arg.endswith("`"): arg = re.sub(r"^`(.*)`$", r"\1", arg) arg = re.sub(r"``", r"`", arg) @@ -331,40 +332,35 @@ def change_db(self, arg: str, **_) -> Generator[tuple, None, None]: self.sqlexecute.change_db(arg) msg = f'You are now connected to database "{self.sqlexecute.dbname}" as user "{self.sqlexecute.user}"' - yield ( - None, - None, - None, - msg, - ) + yield SQLResult(status=msg) - def execute_from_file(self, arg: str, **_) -> Iterable[tuple]: + def execute_from_file(self, arg: str, **_) -> Iterable[SQLResult]: if not arg: message = "Missing required argument: filename." - return [(None, None, None, message)] + return [SQLResult(status=message)] try: with open(os.path.expanduser(arg)) as f: query = f.read() except IOError as e: - return [(None, None, None, str(e))] + return [SQLResult(status=str(e))] if self.destructive_warning and confirm_destructive_query(query) is False: message = "Wise choice. Command execution stopped." - return [(None, None, None, message)] + return [SQLResult(status=message)] assert isinstance(self.sqlexecute, SQLExecute) return self.sqlexecute.run(query) - def change_prompt_format(self, arg: str, **_) -> list[tuple]: + def change_prompt_format(self, arg: str, **_) -> list[SQLResult]: """ Change the prompt format. """ if not arg: message = "Missing required argument, format." - return [(None, None, None, message)] + return [SQLResult(status=message)] self.prompt_format = self.get_prompt(arg) - return [(None, None, None, f"Changed prompt format to {arg}")] + return [SQLResult(status=f"Changed prompt format to {arg}")] def initialize_logging(self) -> None: log_file = os.path.expanduser(self.config["main"]["log_file"]) @@ -818,7 +814,7 @@ def show_suggestion_tip() -> bool: # mutating if any one of the component statements is mutating mutating = False - def output_res(res: Generator[tuple], start: float) -> None: + def output_res(res: Generator[SQLResult], start: float) -> None: nonlocal mutating result_count = 0 for title, cur, headers, status in res: @@ -1272,7 +1268,7 @@ def configure_pager(self) -> None: if cnf["skip-pager"] or not self.config["main"].as_bool("enable_pager"): special.disable_pager() - def refresh_completions(self, reset: bool = False) -> list[tuple]: + def refresh_completions(self, reset: bool = False) -> list[SQLResult]: if reset: with self._completer_lock: self.completer.reset_completions() @@ -1287,7 +1283,7 @@ def refresh_completions(self, reset: bool = False) -> list[tuple]: }, ) - return [(None, None, None, "Auto-completion refresh started in the background.")] + return [SQLResult(status="Auto-completion refresh started in the background.")] def _on_completions_refreshed(self, new_completer: SQLCompleter) -> None: """Swap the completer object in cli with the newly created completer.""" diff --git a/mycli/packages/special/dbcommands.py b/mycli/packages/special/dbcommands.py index 1f07093a..d156dab1 100644 --- a/mycli/packages/special/dbcommands.py +++ b/mycli/packages/special/dbcommands.py @@ -9,6 +9,7 @@ from mycli.packages.special import iocommands from mycli.packages.special.main import ArgType, special_command from mycli.packages.special.utils import format_uptime +from mycli.packages.sqlresult import SQLResult logger = logging.getLogger(__name__) @@ -19,19 +20,18 @@ def list_tables( arg: str | None = None, _arg_type: ArgType = ArgType.PARSED_QUERY, verbose: bool = False, -) -> list[tuple]: +) -> list[SQLResult]: if arg: query = f'SHOW FIELDS FROM {arg}' else: query = "SHOW TABLES" logger.debug(query) cur.execute(query) - tables = cur.fetchall() status = "" if cur.description: headers = [x[0] for x in cur.description] else: - return [(None, None, None, "")] + return [SQLResult(status="")] if verbose and arg: query = f'SHOW CREATE TABLE {arg}' @@ -40,25 +40,25 @@ def list_tables( if one := cur.fetchone(): status = one[1] - return [(None, tables, headers, status)] + return [SQLResult(cursor=cur, headers=headers, status=status)] @special_command("\\l", "\\l", "List databases.", arg_type=ArgType.RAW_QUERY, case_sensitive=True) -def list_databases(cur: Cursor, **_) -> list[tuple]: +def list_databases(cur: Cursor, **_) -> list[SQLResult]: query = "SHOW DATABASES" logger.debug(query) cur.execute(query) if cur.description: headers = [x[0] for x in cur.description] - return [(None, cur, headers, "")] + return [SQLResult(cursor=cur, headers=headers, status="")] else: - return [(None, None, None, "")] + return [SQLResult(status="")] @special_command( "status", "\\s", "Get status information from the server.", arg_type=ArgType.RAW_QUERY, aliases=["\\s"], case_sensitive=True ) -def status(cur: Cursor, **_) -> list[tuple]: +def status(cur: Cursor, **_) -> list[SQLResult]: query = "SHOW GLOBAL STATUS;" logger.debug(query) try: @@ -167,4 +167,4 @@ def status(cur: Cursor, **_) -> list[tuple]: footer.append("\n" + stats_str) footer.append("--------------") - return [("\n".join(title), output, "", "\n".join(footer))] + return [SQLResult(title="\n".join(title), cursor=output, headers="", status="\n".join(footer))] diff --git a/mycli/packages/special/delimitercommand.py b/mycli/packages/special/delimitercommand.py index 4e24ac3e..c5436f9b 100644 --- a/mycli/packages/special/delimitercommand.py +++ b/mycli/packages/special/delimitercommand.py @@ -5,6 +5,8 @@ import sqlparse +from mycli.packages.sqlresult import SQLResult + class DelimiterCommand: def __init__(self) -> None: @@ -55,7 +57,7 @@ def queries_iter(self, input_str: str) -> Generator[str, None, None]: combined_statement += delimiter queries = self._split(combined_statement)[1:] - def set(self, arg: str, **_) -> list[tuple[None, None, None, str]]: + def set(self, arg: str, **_) -> list[SQLResult]: """Change delimiter. Since `arg` is everything that follows the DELIMITER token @@ -67,14 +69,14 @@ def set(self, arg: str, **_) -> list[tuple[None, None, None, str]]: match = arg and re.search(r"[^\s]+", arg) if not match: message = "Missing required argument, delimiter" - return [(None, None, None, message)] + return [SQLResult(status=message)] delimiter = match.group() if delimiter.lower() == "delimiter": - return [(None, None, None, 'Invalid delimiter "delimiter"')] + return [SQLResult(status='Invalid delimiter "delimiter"')] self._delimiter = delimiter - return [(None, None, None, f'Changed delimiter to {delimiter}')] + return [SQLResult(status=f'Changed delimiter to {delimiter}')] @property def current(self) -> str: diff --git a/mycli/packages/special/iocommands.py b/mycli/packages/special/iocommands.py index c2b97ae7..e7238d64 100644 --- a/mycli/packages/special/iocommands.py +++ b/mycli/packages/special/iocommands.py @@ -21,6 +21,7 @@ from mycli.packages.special.favoritequeries import FavoriteQueries from mycli.packages.special.main import ArgType, special_command from mycli.packages.special.utils import handle_cd_command +from mycli.packages.sqlresult import SQLResult TIMING_ENABLED = False use_expanded_output = False @@ -76,7 +77,7 @@ def is_show_favorite_query() -> bool: aliases=["\\P"], case_sensitive=True, ) -def set_pager(arg: str, **_) -> list[tuple]: +def set_pager(arg: str, **_) -> list[SQLResult]: if arg: os.environ["PAGER"] = arg msg = f"PAGER set to {arg}." @@ -89,22 +90,22 @@ def set_pager(arg: str, **_) -> list[tuple]: msg = "Pager enabled." set_pager_enabled(True) - return [(None, None, None, msg)] + return [SQLResult(status=msg)] @special_command("nopager", "\\n", "Disable pager, print to stdout.", arg_type=ArgType.NO_QUERY, aliases=["\\n"], case_sensitive=True) -def disable_pager() -> list[tuple]: +def disable_pager() -> list[SQLResult]: set_pager_enabled(False) - return [(None, None, None, "Pager disabled.")] + return [SQLResult(status="Pager disabled.")] @special_command("\\timing", "\\t", "Toggle timing of commands.", arg_type=ArgType.NO_QUERY, aliases=["\\t"], case_sensitive=True) -def toggle_timing() -> list[tuple]: +def toggle_timing() -> list[SQLResult]: global TIMING_ENABLED TIMING_ENABLED = not TIMING_ENABLED message = "Timing is " message += "on." if TIMING_ENABLED else "off." - return [(None, None, None, message)] + return [SQLResult(status=message)] def is_timing_enabled() -> bool: @@ -249,7 +250,7 @@ def set_redirect(command_part: str | None, file_operator_part: str | None, file_ @special_command("\\f", "\\f [name [args..]]", "List or execute favorite queries.", arg_type=ArgType.PARSED_QUERY, case_sensitive=True) -def execute_favorite_query(cur: Cursor, arg: str, **_) -> Generator[tuple, None, None]: +def execute_favorite_query(cur: Cursor, arg: str, **_) -> Generator[SQLResult, None, None]: """Returns (title, rows, headers, status)""" if arg == "": for result in list_favorite_queries(): @@ -262,11 +263,11 @@ def execute_favorite_query(cur: Cursor, arg: str, **_) -> Generator[tuple, None, query = FavoriteQueries.instance.get(name) if query is None: message = f"No favorite query: {name}" - yield (None, None, None, message) + yield SQLResult(status=message) else: query, arg_error = subst_favorite_query_args(query, args) if query is None: - yield (None, None, None, arg_error) + yield SQLResult(status=arg_error) else: for sql in sqlparse.split(query): sql = sql.rstrip(";") @@ -274,12 +275,12 @@ def execute_favorite_query(cur: Cursor, arg: str, **_) -> Generator[tuple, None, cur.execute(sql) if cur.description: headers = [x[0] for x in cur.description] - yield (title, cur, headers, None) + yield SQLResult(title=title, cursor=cur, headers=headers) else: - yield (title, None, None, None) + yield SQLResult(title=title) -def list_favorite_queries() -> list[tuple]: +def list_favorite_queries() -> list[SQLResult]: """List of all favorite queries. Returns (title, rows, headers, status)""" @@ -290,7 +291,7 @@ def list_favorite_queries() -> list[tuple]: status = "\nNo favorite queries found." + FavoriteQueries.instance.usage else: status = "" - return [("", rows, headers, status)] + return [SQLResult(title="", cursor=rows, headers=headers, status=status)] def subst_favorite_query_args(query: str, args: list[str]) -> list[str | None]: @@ -310,51 +311,51 @@ def subst_favorite_query_args(query: str, args: list[str]) -> list[str | None]: @special_command("\\fs", "\\fs name query", "Save a favorite query.") -def save_favorite_query(arg: str, **_) -> list[tuple]: +def save_favorite_query(arg: str, **_) -> list[SQLResult]: """Save a new favorite query. Returns (title, rows, headers, status)""" usage = "Syntax: \\fs name query.\n\n" + FavoriteQueries.instance.usage if not arg: - return [(None, None, None, usage)] + return [SQLResult(status=usage)] name, _separator, query = arg.partition(" ") # If either name or query is missing then print the usage and complain. if (not name) or (not query): - return [(None, None, None, usage + "Err: Both name and query are required.")] + return [SQLResult(status=f"{usage} Err: Both name and query are required.")] FavoriteQueries.instance.save(name, query) - return [(None, None, None, "Saved.")] + return [SQLResult(status="Saved.")] @special_command("\\fd", "\\fd [name]", "Delete a favorite query.") -def delete_favorite_query(arg: str, **_) -> list[tuple]: +def delete_favorite_query(arg: str, **_) -> list[SQLResult]: """Delete an existing favorite query.""" usage = "Syntax: \\fd name.\n\n" + FavoriteQueries.instance.usage if not arg: - return [(None, None, None, usage)] + return [SQLResult(status=usage)] status = FavoriteQueries.instance.delete(arg) - return [(None, None, None, status)] + return [SQLResult(status=status)] @special_command("system", "system [command]", "Execute a system shell commmand.") -def execute_system_command(arg: str, **_) -> list[tuple]: +def execute_system_command(arg: str, **_) -> list[SQLResult]: """Execute a system shell command.""" usage = "Syntax: system [command].\n" if not arg: - return [(None, None, None, usage)] + return [SQLResult(status=usage)] try: command = arg.strip() if command.startswith("cd"): ok, error_message = handle_cd_command(arg) if not ok: - return [(None, None, None, error_message)] - return [(None, None, None, "")] + return [SQLResult(status=error_message)] + return [SQLResult(status="")] args = arg.split(" ") process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) @@ -364,9 +365,9 @@ def execute_system_command(arg: str, **_) -> list[tuple]: encoding = locale.getpreferredencoding(False) response_str = response.decode(encoding) - return [(None, None, None, response_str)] + return [SQLResult(status=response_str)] except OSError as e: - return [(None, None, None, f"OSError: {e.strerror}")] + return [SQLResult(status=f"OSError: {e.strerror}")] def parseargfile(arg: str) -> tuple[str, str]: @@ -384,7 +385,7 @@ def parseargfile(arg: str) -> tuple[str, str]: @special_command("tee", "tee [-o] filename", "Append all results to an output file (overwrite using -o).") -def set_tee(arg: str, **_) -> list[tuple]: +def set_tee(arg: str, **_) -> list[SQLResult]: global tee_file try: @@ -392,7 +393,7 @@ def set_tee(arg: str, **_) -> list[tuple]: except (IOError, OSError) as e: raise OSError(f"Cannot write to file '{e.filename}': {e.strerror}") from e - return [(None, None, None, "")] + return [SQLResult(status="")] def close_tee() -> None: @@ -403,9 +404,9 @@ def close_tee() -> None: @special_command("notee", "notee", "Stop writing results to an output file.") -def no_tee(arg: str, **_) -> list[tuple]: +def no_tee(arg: str, **_) -> list[SQLResult]: close_tee() - return [(None, None, None, "")] + return [SQLResult(status="")] def write_tee(output: str) -> None: @@ -417,7 +418,7 @@ def write_tee(output: str) -> None: @special_command("\\once", "\\o [-o] filename", "Append next result to an output file (overwrite using -o).", aliases=["\\o"]) -def set_once(arg: str, **_) -> list[tuple]: +def set_once(arg: str, **_) -> list[SQLResult]: global once_file, written_to_once_file try: @@ -426,7 +427,7 @@ def set_once(arg: str, **_) -> list[tuple]: raise OSError(f"Cannot write to file '{e.filename}': {e.strerror}") from e written_to_once_file = False - return [(None, None, None, "")] + return [SQLResult(status="")] def is_redirected() -> bool: @@ -470,7 +471,7 @@ def _run_post_redirect_hook(post_redirect_command: str, filename: str) -> None: @special_command("\\pipe_once", "\\| command", "Send next result to a subprocess.", aliases=["\\|"]) -def set_pipe_once(arg: str, **_) -> list[tuple]: +def set_pipe_once(arg: str, **_) -> list[SQLResult]: if not arg: raise OSError("pipe_once requires a command") if WIN: @@ -488,7 +489,7 @@ def set_pipe_once(arg: str, **_) -> list[tuple]: encoding="UTF-8", universal_newlines=True, ) - return [(None, None, None, "")] + return [SQLResult(status="")] def write_pipe_once(line: str) -> None: @@ -529,14 +530,14 @@ def flush_pipe_once_if_written(post_redirect_command: str) -> None: @special_command("watch", "watch [seconds] [-c] query", "Executes the query every [seconds] seconds (by default 5).") -def watch_query(arg: str, **kwargs) -> Generator[tuple, None, None]: +def watch_query(arg: str, **kwargs) -> Generator[SQLResult, None, None]: usage = """Syntax: watch [seconds] [-c] query. * seconds: The interval at the query will be repeated, in seconds. By default 5. * -c: Clears the screen between every iteration. """ if not arg: - yield (None, None, None, usage) + yield SQLResult(status=usage) return seconds = 5.0 clear_screen = False @@ -545,7 +546,7 @@ def watch_query(arg: str, **kwargs) -> Generator[tuple, None, None]: arg = arg.strip() if not arg: # Oops, we parsed all the arguments without finding a statement - yield (None, None, None, usage) + yield SQLResult(status=usage) return (left_arg, _, right_arg) = arg.partition(" ") arg = right_arg @@ -578,9 +579,9 @@ def watch_query(arg: str, **kwargs) -> Generator[tuple, None, None]: cur.execute(sql) if cur.description: headers = [x[0] for x in cur.description] - yield (title, cur, headers, None) + yield SQLResult(title=title, cursor=cur, headers=headers) else: - yield (title, None, None, None) + yield SQLResult(title=title) sleep(seconds) except KeyboardInterrupt: # This prints the Ctrl-C character in its own line, which prevents @@ -592,7 +593,7 @@ def watch_query(arg: str, **kwargs) -> Generator[tuple, None, None]: @special_command("delimiter", None, "Change SQL delimiter.") -def set_delimiter(arg: str, **_) -> list[tuple]: +def set_delimiter(arg: str, **_) -> list[SQLResult]: return delimiter_command.set(arg) diff --git a/mycli/packages/special/main.py b/mycli/packages/special/main.py index 19998d69..4f85b6d7 100644 --- a/mycli/packages/special/main.py +++ b/mycli/packages/special/main.py @@ -4,6 +4,8 @@ import os from typing import Callable +from mycli.packages.sqlresult import SQLResult + try: if not os.environ.get('MYCLI_LLM_OFF'): import llm # noqa: F401 @@ -119,7 +121,7 @@ def register_special_command( ) -def execute(cur: Cursor, sql: str) -> list[tuple]: +def execute(cur: Cursor, sql: str) -> list[SQLResult]: """Execute a special command and return the results. If the special command is not supported a CommandNotFound will be raised. """ @@ -151,17 +153,17 @@ def execute(cur: Cursor, sql: str) -> list[tuple]: @special_command("help", "\\?", "Show this help.", arg_type=ArgType.NO_QUERY, aliases=["\\?", "?"]) -def show_help(*_args) -> list[tuple]: +def show_help(*_args) -> list[SQLResult]: headers = ["Command", "Shortcut", "Description"] result = [] for _, value in sorted(COMMANDS.items()): if not value.hidden: result.append((value.command, value.shortcut, value.description)) - return [(None, result, headers, None)] + return [SQLResult(cursor=result, headers=headers)] -def show_keyword_help(cur: Cursor, arg: str) -> list[tuple]: +def show_keyword_help(cur: Cursor, arg: str) -> list[SQLResult]: """ Call the built-in "show ", to display help for an SQL keyword. :param cur: cursor @@ -174,9 +176,9 @@ def show_keyword_help(cur: Cursor, arg: str) -> list[tuple]: cur.execute(query) if cur.description and cur.rowcount > 0: headers = [x[0] for x in cur.description] - return [(None, cur, headers, "")] + return [SQLResult(cursor=cur, headers=headers, status="")] else: - return [(None, None, None, f'No help found for {keyword}.')] + return [SQLResult(status=f'No help found for {keyword}.')] @special_command("exit", "\\q", "Exit.", arg_type=ArgType.NO_QUERY, aliases=["\\q"]) diff --git a/mycli/packages/sqlresult.py b/mycli/packages/sqlresult.py new file mode 100644 index 00000000..cc8d5b25 --- /dev/null +++ b/mycli/packages/sqlresult.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass + +from pymysql.cursors import Cursor + + +@dataclass +class SQLResult: + title: str | None = None + cursor: Cursor | list[tuple] | None = None + headers: list[str] | str | None = None + status: str | None = None + + def __iter__(self): + return iter((self.title, self.cursor, self.headers, self.status)) + + def __str__(self): + return f"{self.title}, {self.cursor}, {self.headers}, {self.status}" diff --git a/mycli/sqlexecute.py b/mycli/sqlexecute.py index 2a869190..9e9f87fa 100644 --- a/mycli/sqlexecute.py +++ b/mycli/sqlexecute.py @@ -15,6 +15,7 @@ from mycli.packages.special import iocommands from mycli.packages.special.main import CommandNotFound, execute +from mycli.packages.sqlresult import SQLResult try: import paramiko # noqa: F401 @@ -327,7 +328,7 @@ def connect( self.reset_connection_id() self.server_info = ServerInfo.from_version_string(conn.server_version) # type: ignore[attr-defined] - def run(self, statement: str) -> Generator[tuple, None, None]: + def run(self, statement: str) -> Generator[SQLResult, None, None]: """Execute the sql in the database and return the results. The results are a list of tuples. Each tuple has 4 values (title, rows, headers, status). @@ -336,7 +337,7 @@ def run(self, statement: str) -> Generator[tuple, None, None]: # Remove spaces and EOL statement = statement.strip() if not statement: # Empty string - yield (None, None, None, None) + yield SQLResult() # Split the sql into separate queries and run each one. # Unless it's saving a favorite query, in which case we @@ -376,7 +377,7 @@ def run(self, statement: str) -> Generator[tuple, None, None]: if not cur.nextset() or (not cur.rowcount and cur.description is None): break - def get_result(self, cursor: Cursor) -> tuple: + def get_result(self, cursor: Cursor) -> SQLResult: """Get the current result's data from the cursor.""" title = headers = None @@ -394,7 +395,7 @@ def get_result(self, cursor: Cursor) -> tuple: plural = '' if cursor.warning_count == 1 else 's' status = f'{status}, {cursor.warning_count} warning{plural}' - return (title, cursor, headers, status) + return SQLResult(title=title, cursor=cursor, headers=headers, status=status) def tables(self) -> Generator[tuple[str], None, None]: """Yields table names""" diff --git a/test/test_completion_refresher.py b/test/test_completion_refresher.py index 9819ee50..4437d8a0 100644 --- a/test/test_completion_refresher.py +++ b/test/test_completion_refresher.py @@ -48,9 +48,10 @@ def test_refresh_called_once(refresher): with patch.object(refresher, "_bg_refresh") as bg_refresh: actual = refresher.refresh(sqlexecute, callbacks) time.sleep(1) # Wait for the thread to work. - assert len(actual) == 1 - assert len(actual[0]) == 4 - assert actual[0][3] == "Auto-completion refresh started in the background." + assert actual[0].title is None + assert actual[0].cursor is None + assert actual[0].headers is None + assert actual[0].status == "Auto-completion refresh started in the background." bg_refresh.assert_called_with(sqlexecute, callbacks, {}) @@ -72,15 +73,17 @@ def dummy_bg_refresh(*args): actual1 = refresher.refresh(sqlexecute, callbacks) time.sleep(1) # Wait for the thread to work. - assert len(actual1) == 1 - assert len(actual1[0]) == 4 - assert actual1[0][3] == "Auto-completion refresh started in the background." + assert actual1[0].title is None + assert actual1[0].cursor is None + assert actual1[0].headers is None + assert actual1[0].status == "Auto-completion refresh started in the background." actual2 = refresher.refresh(sqlexecute, callbacks) time.sleep(1) # Wait for the thread to work. - assert len(actual2) == 1 - assert len(actual2[0]) == 4 - assert actual2[0][3] == "Auto-completion refresh restarted." + assert actual2[0].title is None + assert actual2[0].cursor is None + assert actual2[0].headers is None + assert actual2[0].status == "Auto-completion refresh restarted." def test_refresh_with_callbacks(refresher): diff --git a/test/test_main.py b/test/test_main.py index f513ebde..0c287fde 100644 --- a/test/test_main.py +++ b/test/test_main.py @@ -118,7 +118,7 @@ def test_reconnect_no_database(executor, capsys): sql = "\\r" result = next(mycli.packages.special.execute(executor, sql)) stdout, _stderr = capsys.readouterr() - assert result[-1] is None + assert result.status is None assert "Already connected" in stdout @@ -150,7 +150,7 @@ def test_reconnect_with_different_database(executor): _result_1 = next(mycli.packages.special.execute(executor, sql_1)) result_2 = next(mycli.packages.special.execute(executor, sql_2)) expected = f'You are now connected to database "{database_2}" as user "{USER}"' - assert expected in result_2[-1] + assert expected in result_2.status @dbtest @@ -180,7 +180,7 @@ def test_reconnect_with_same_database(executor): sql = f"\\r {database}" result = next(mycli.packages.special.execute(executor, sql)) expected = f'You are already connected to database "{database}" as user "{USER}"' - assert expected in result[-1] + assert expected in result.status @dbtest diff --git a/test/test_special_iocommands.py b/test/test_special_iocommands.py index 1a738484..7baade16 100644 --- a/test/test_special_iocommands.py +++ b/test/test_special_iocommands.py @@ -109,7 +109,7 @@ def test_favorite_query(): with db_connection().cursor() as cur: query = 'select "✔"' mycli.packages.special.execute(cur, f"\\fs check {query}") - assert next(mycli.packages.special.execute(cur, "\\f check"))[0] == "> " + query + assert next(mycli.packages.special.execute(cur, "\\f check")).title == "> " + query def test_once_command(): @@ -201,8 +201,8 @@ def test_watch_query_iteration(): expected_title = f"> {query}" with db_connection().cursor() as cur: result = next(mycli.packages.special.iocommands.watch_query(arg=query, cur=cur)) - assert result[0] == expected_title - assert result[2][0] == expected_value + assert result.title == expected_title + assert result.headers[0] == expected_value @dbtest @@ -229,8 +229,8 @@ def test_watch_query_full(): ctrl_c_process.join(1) assert len(results) in expected_results for result in results: - assert result[0] == expected_title - assert result[2][0] == expected_value + assert result.title == expected_title + assert result.headers[0] == expected_value @dbtest diff --git a/test/test_tabular_output.py b/test/test_tabular_output.py index d980fb55..48146bbe 100644 --- a/test/test_tabular_output.py +++ b/test/test_tabular_output.py @@ -8,6 +8,7 @@ import pytest from mycli.main import MyCli +from mycli.packages.sqlresult import SQLResult from test.utils import HOST, PASSWORD, PORT, USER, dbtest @@ -47,7 +48,7 @@ def description(self): return self.description # Test sql-update output format - assert list(mycli.change_table_format("sql-update")) == [(None, None, None, "Changed table format to sql-update")] + assert list(mycli.change_table_format("sql-update")) == [SQLResult(status="Changed table format to sql-update")] mycli.main_formatter.query = "" mycli.redirect_formatter.query = "" output = mycli.format_output(None, FakeCursor(), headers, False, False) @@ -66,7 +67,7 @@ def description(self): , `binary` = X'aabb' WHERE `letters` = 'd';""") # Test sql-update-2 output format - assert list(mycli.change_table_format("sql-update-2")) == [(None, None, None, "Changed table format to sql-update-2")] + assert list(mycli.change_table_format("sql-update-2")) == [SQLResult(None, None, None, "Changed table format to sql-update-2")] mycli.main_formatter.query = "" mycli.redirect_formatter.query = "" output = mycli.format_output(None, FakeCursor(), headers, False, False) @@ -82,7 +83,7 @@ def description(self): , `binary` = X'aabb' WHERE `letters` = 'd' AND `number` = 456;""") # Test sql-insert output format (without table name) - assert list(mycli.change_table_format("sql-insert")) == [(None, None, None, "Changed table format to sql-insert")] + assert list(mycli.change_table_format("sql-insert")) == [SQLResult(None, None, None, "Changed table format to sql-insert")] mycli.main_formatter.query = "" mycli.redirect_formatter.query = "" output = mycli.format_output(None, FakeCursor(), headers, False, False) @@ -92,7 +93,7 @@ def description(self): , ('d', 456, '1', 0.5e0, X'aabb') ;""") # Test sql-insert output format (with table name) - assert list(mycli.change_table_format("sql-insert")) == [(None, None, None, "Changed table format to sql-insert")] + assert list(mycli.change_table_format("sql-insert")) == [SQLResult(None, None, None, "Changed table format to sql-insert")] mycli.main_formatter.query = "SELECT * FROM `table`" mycli.redirect_formatter.query = "SELECT * FROM `table`" output = mycli.format_output(None, FakeCursor(), headers, False, False) @@ -102,7 +103,7 @@ def description(self): , ('d', 456, '1', 0.5e0, X'aabb') ;""") # Test sql-insert output format (with database + table name) - assert list(mycli.change_table_format("sql-insert")) == [(None, None, None, "Changed table format to sql-insert")] + assert list(mycli.change_table_format("sql-insert")) == [SQLResult(None, None, None, "Changed table format to sql-insert")] mycli.main_formatter.query = "SELECT * FROM `database`.`table`" mycli.redirect_formatter.query = "SELECT * FROM `database`.`table`" output = mycli.format_output(None, FakeCursor(), headers, False, False) From 78e5ff54aaccdc168d12ff5590a4499209502fcd Mon Sep 17 00:00:00 2001 From: Scott Nemes Date: Sat, 10 Jan 2026 11:29:45 -0800 Subject: [PATCH 2/4] Fixed changelog; merge borked it --- changelog.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/changelog.md b/changelog.md index d3246323..29bb3edf 100644 --- a/changelog.md +++ b/changelog.md @@ -1,4 +1,5 @@ Upcoming (TBD) +============== Internal -------- @@ -6,6 +7,7 @@ Internal 1.44.1 (2026/01/10) +============== Bug Fixes -------- From 0c0740c5ed6cbd58522d9409f359ae8b068d6666 Mon Sep 17 00:00:00 2001 From: Scott Nemes Date: Sat, 10 Jan 2026 11:31:18 -0800 Subject: [PATCH 3/4] Formatted --- mycli/packages/special/delimitercommand.py | 1 + 1 file changed, 1 insertion(+) diff --git a/mycli/packages/special/delimitercommand.py b/mycli/packages/special/delimitercommand.py index 51a92af9..04b5d330 100644 --- a/mycli/packages/special/delimitercommand.py +++ b/mycli/packages/special/delimitercommand.py @@ -6,6 +6,7 @@ import sqlparse from mycli.packages.sqlresult import SQLResult + sqlparse.engine.grouping.MAX_GROUPING_DEPTH = None # type: ignore[assignment] sqlparse.engine.grouping.MAX_GROUPING_TOKENS = None # type: ignore[assignment] From a4715b0634117d55542c3dae32a5301094cf0f96 Mon Sep 17 00:00:00 2001 From: Scott Nemes Date: Sat, 10 Jan 2026 12:22:15 -0800 Subject: [PATCH 4/4] Renamed SQLResult field to be more generic to reflect the current overloaded usage --- mycli/packages/special/dbcommands.py | 6 +++--- mycli/packages/special/iocommands.py | 6 +++--- mycli/packages/special/main.py | 4 ++-- mycli/packages/sqlresult.py | 6 +++--- mycli/sqlexecute.py | 2 +- test/test_completion_refresher.py | 6 +++--- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/mycli/packages/special/dbcommands.py b/mycli/packages/special/dbcommands.py index d156dab1..c69166cc 100644 --- a/mycli/packages/special/dbcommands.py +++ b/mycli/packages/special/dbcommands.py @@ -40,7 +40,7 @@ def list_tables( if one := cur.fetchone(): status = one[1] - return [SQLResult(cursor=cur, headers=headers, status=status)] + return [SQLResult(results=cur, headers=headers, status=status)] @special_command("\\l", "\\l", "List databases.", arg_type=ArgType.RAW_QUERY, case_sensitive=True) @@ -50,7 +50,7 @@ def list_databases(cur: Cursor, **_) -> list[SQLResult]: cur.execute(query) if cur.description: headers = [x[0] for x in cur.description] - return [SQLResult(cursor=cur, headers=headers, status="")] + return [SQLResult(results=cur, headers=headers, status="")] else: return [SQLResult(status="")] @@ -167,4 +167,4 @@ def status(cur: Cursor, **_) -> list[SQLResult]: footer.append("\n" + stats_str) footer.append("--------------") - return [SQLResult(title="\n".join(title), cursor=output, headers="", status="\n".join(footer))] + return [SQLResult(title="\n".join(title), results=output, headers="", status="\n".join(footer))] diff --git a/mycli/packages/special/iocommands.py b/mycli/packages/special/iocommands.py index 1e12a5f5..c17e5c71 100644 --- a/mycli/packages/special/iocommands.py +++ b/mycli/packages/special/iocommands.py @@ -278,7 +278,7 @@ def execute_favorite_query(cur: Cursor, arg: str, **_) -> Generator[SQLResult, N cur.execute(sql) if cur.description: headers = [x[0] for x in cur.description] - yield SQLResult(title=title, cursor=cur, headers=headers) + yield SQLResult(title=title, results=cur, headers=headers) else: yield SQLResult(title=title) @@ -294,7 +294,7 @@ def list_favorite_queries() -> list[SQLResult]: status = "\nNo favorite queries found." + FavoriteQueries.instance.usage else: status = "" - return [SQLResult(title="", cursor=rows, headers=headers, status=status)] + return [SQLResult(title="", results=rows, headers=headers, status=status)] def subst_favorite_query_args(query: str, args: list[str]) -> list[str | None]: @@ -582,7 +582,7 @@ def watch_query(arg: str, **kwargs) -> Generator[SQLResult, None, None]: cur.execute(sql) if cur.description: headers = [x[0] for x in cur.description] - yield SQLResult(title=title, cursor=cur, headers=headers) + yield SQLResult(title=title, results=cur, headers=headers) else: yield SQLResult(title=title) sleep(seconds) diff --git a/mycli/packages/special/main.py b/mycli/packages/special/main.py index 4f85b6d7..1a04506a 100644 --- a/mycli/packages/special/main.py +++ b/mycli/packages/special/main.py @@ -160,7 +160,7 @@ def show_help(*_args) -> list[SQLResult]: for _, value in sorted(COMMANDS.items()): if not value.hidden: result.append((value.command, value.shortcut, value.description)) - return [SQLResult(cursor=result, headers=headers)] + return [SQLResult(results=result, headers=headers)] def show_keyword_help(cur: Cursor, arg: str) -> list[SQLResult]: @@ -176,7 +176,7 @@ def show_keyword_help(cur: Cursor, arg: str) -> list[SQLResult]: cur.execute(query) if cur.description and cur.rowcount > 0: headers = [x[0] for x in cur.description] - return [SQLResult(cursor=cur, headers=headers, status="")] + return [SQLResult(results=cur, headers=headers, status="")] else: return [SQLResult(status=f'No help found for {keyword}.')] diff --git a/mycli/packages/sqlresult.py b/mycli/packages/sqlresult.py index cc8d5b25..5da243bb 100644 --- a/mycli/packages/sqlresult.py +++ b/mycli/packages/sqlresult.py @@ -6,12 +6,12 @@ @dataclass class SQLResult: title: str | None = None - cursor: Cursor | list[tuple] | None = None + results: Cursor | list[tuple] | None = None headers: list[str] | str | None = None status: str | None = None def __iter__(self): - return iter((self.title, self.cursor, self.headers, self.status)) + return iter((self.title, self.results, self.headers, self.status)) def __str__(self): - return f"{self.title}, {self.cursor}, {self.headers}, {self.status}" + return f"{self.title}, {self.results}, {self.headers}, {self.status}" diff --git a/mycli/sqlexecute.py b/mycli/sqlexecute.py index 9e9f87fa..e29bf1f4 100644 --- a/mycli/sqlexecute.py +++ b/mycli/sqlexecute.py @@ -395,7 +395,7 @@ def get_result(self, cursor: Cursor) -> SQLResult: plural = '' if cursor.warning_count == 1 else 's' status = f'{status}, {cursor.warning_count} warning{plural}' - return SQLResult(title=title, cursor=cursor, headers=headers, status=status) + return SQLResult(title=title, results=cursor, headers=headers, status=status) def tables(self) -> Generator[tuple[str], None, None]: """Yields table names""" diff --git a/test/test_completion_refresher.py b/test/test_completion_refresher.py index 4437d8a0..b94db2ce 100644 --- a/test/test_completion_refresher.py +++ b/test/test_completion_refresher.py @@ -49,7 +49,7 @@ def test_refresh_called_once(refresher): actual = refresher.refresh(sqlexecute, callbacks) time.sleep(1) # Wait for the thread to work. assert actual[0].title is None - assert actual[0].cursor is None + assert actual[0].results is None assert actual[0].headers is None assert actual[0].status == "Auto-completion refresh started in the background." bg_refresh.assert_called_with(sqlexecute, callbacks, {}) @@ -74,14 +74,14 @@ def dummy_bg_refresh(*args): actual1 = refresher.refresh(sqlexecute, callbacks) time.sleep(1) # Wait for the thread to work. assert actual1[0].title is None - assert actual1[0].cursor is None + assert actual1[0].results is None assert actual1[0].headers is None assert actual1[0].status == "Auto-completion refresh started in the background." actual2 = refresher.refresh(sqlexecute, callbacks) time.sleep(1) # Wait for the thread to work. assert actual2[0].title is None - assert actual2[0].cursor is None + assert actual2[0].results is None assert actual2[0].headers is None assert actual2[0].status == "Auto-completion refresh restarted."