diff --git a/setup.py b/setup.py index e11ad5c7..aa56055e 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ packages=['toot', 'toot.tui', 'toot.tui.richtext', 'toot.utils'], python_requires=">=3.7", install_requires=[ + "click~=8.1", "requests>=2.13,<3.0", "beautifulsoup4>=4.5.0,<5.0", "wcwidth>=0.1.7", @@ -62,7 +63,7 @@ }, entry_points={ 'console_scripts': [ - 'toot=toot.console:main', + 'toot=toot.cli:cli', ], } ) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8fcd1cb1..dc387ead 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -20,8 +20,10 @@ import pytest import uuid +from click.testing import CliRunner, Result from pathlib import Path from toot import api, App, User +from toot.cli import Context from toot.console import run_command from toot.exceptions import ApiError, ConsoleError from toot.output import print_out @@ -105,19 +107,21 @@ def friend_id(app, user, friend): return api.find_account(app, user, friend.username)["id"] +@pytest.fixture(scope="session", autouse=True) +def testing_env(): + os.environ["TOOT_TESTING"] = "true" + + +@pytest.fixture(scope="session") +def runner(): + return CliRunner(mix_stderr=False) + + @pytest.fixture -def run(app, user, capsys): - def _run(command, *params, as_user=None): - # The try/catch duplicates logic from console.main to convert exceptions - # to printed error messages. TODO: could be deduped - try: - run_command(app, as_user or user, command, params or []) - except (ConsoleError, ApiError) as e: - print_out(str(e)) - - out, err = capsys.readouterr() - assert err == "" - return strip_ansi(out) +def run(app, user, runner): + def _run(command, *params, as_user=None) -> Result: + ctx = Context(app, as_user or user) + return runner.invoke(command, params, obj=ctx) return _run @@ -130,12 +134,10 @@ def _run_json(command, *params): @pytest.fixture -def run_anon(capsys): - def _run(command, *params): - run_command(None, None, command, params or []) - out, err = capsys.readouterr() - assert err == "" - return strip_ansi(out) +def run_anon(runner): + def _run(command, *params) -> Result: + ctx = Context(None, None) + return runner.invoke(command, params, obj=ctx) return _run diff --git a/tests/integration/test_read.py b/tests/integration/test_read.py index 67e77836..a9bb914d 100644 --- a/tests/integration/test_read.py +++ b/tests/integration/test_read.py @@ -1,45 +1,58 @@ import json -from pprint import pprint -import pytest import re -from toot import api -from toot.entities import Account, from_dict_list -from toot.exceptions import ConsoleError +from toot import api, cli +from toot.entities import Account, Status, from_dict, from_dict_list from uuid import uuid4 def test_instance(app, run): - out = run("instance", "--disable-https") - assert "Mastodon" in out - assert app.instance in out - assert "running Mastodon" in out + result = run(cli.instance) + assert result.exit_code == 0 + + assert "Mastodon" in result.stdout + assert app.instance in result.stdout + assert "running Mastodon" in result.stdout def test_instance_json(app, run): - out = run("instance", "--json") - data = json.loads(out) + result = run(cli.instance, "--json") + assert result.exit_code == 0 + + data = json.loads(result.stdout) assert data["title"] is not None assert data["description"] is not None assert data["version"] is not None def test_instance_anon(app, run_anon, base_url): - out = run_anon("instance", base_url) - assert "Mastodon" in out - assert app.instance in out - assert "running Mastodon" in out + result = run_anon(cli.instance, base_url) + assert result.exit_code == 0 + + assert "Mastodon" in result.stdout + assert app.instance in result.stdout + assert "running Mastodon" in result.stdout # Need to specify the instance name when running anon - with pytest.raises(ConsoleError) as exc: - run_anon("instance") - assert str(exc.value) == "Please specify an instance." + result = run_anon(cli.instance) + assert result.exit_code == 1 + assert result.stderr == "Error: Please specify an instance.\n" def test_whoami(user, run): - out = run("whoami") - # TODO: test other fields once updating account is supported - assert f"@{user.username}" in out + result = run(cli.whoami) + assert result.exit_code == 0 + assert f"@{user.username}" in result.stdout + + +def test_whoami_json(user, run): + result = run(cli.whoami, "--json") + assert result.exit_code == 0 + + data = json.loads(result.stdout) + account = from_dict(Account, data) + assert account.username == user.username + assert account.acct == user.username def test_whois(app, friend, run): @@ -51,18 +64,33 @@ def test_whois(app, friend, run): ] for username in variants: - out = run("whois", username) - assert f"@{friend.username}" in out + result = run(cli.whois, username) + assert result.exit_code == 0 + assert f"@{friend.username}" in result.stdout + + +def test_whois_json(app, friend, run): + result = run(cli.whois, friend.username, "--json") + assert result.exit_code == 0 + + data = json.loads(result.stdout) + account = from_dict(Account, data) + assert account.username == friend.username + assert account.acct == friend.username def test_search_account(friend, run): - out = run("search", friend.username) - assert out == f"Accounts:\n* @{friend.username}" + result = run(cli.search, friend.username) + assert result.exit_code == 0 + assert result.stdout.strip() == f"Accounts:\n* @{friend.username}" + +def test_search_account_json(friend, run): + result = run(cli.search, friend.username, "--json") + assert result.exit_code == 0 -def test_search_account_json(friend, run_json): - out = run_json("search", friend.username, "--json") - [account] = from_dict_list(Account, out["accounts"]) + data = json.loads(result.stdout) + [account] = from_dict_list(Account, data["accounts"]) assert account.acct == friend.username @@ -71,17 +99,21 @@ def test_search_hashtag(app, user, run): api.post_status(app, user, "#hashtag_y") api.post_status(app, user, "#hashtag_z") - out = run("search", "#hashtag") - assert out == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z" + result = run(cli.search, "#hashtag") + assert result.exit_code == 0 + assert result.stdout.strip() == "Hashtags:\n#hashtag_x, #hashtag_y, #hashtag_z" -def test_search_hashtag_json(app, user, run_json): +def test_search_hashtag_json(app, user, run): api.post_status(app, user, "#hashtag_x") api.post_status(app, user, "#hashtag_y") api.post_status(app, user, "#hashtag_z") - out = run_json("search", "#hashtag", "--json") - [h1, h2, h3] = sorted(out["hashtags"], key=lambda h: h["name"]) + result = run(cli.search, "#hashtag", "--json") + assert result.exit_code == 0 + + data = json.loads(result.stdout) + [h1, h2, h3] = sorted(data["hashtags"], key=lambda h: h["name"]) assert h1["name"] == "hashtag_x" assert h2["name"] == "hashtag_y" @@ -89,50 +121,78 @@ def test_search_hashtag_json(app, user, run_json): def test_tags(run, base_url): - out = run("tags_followed") - assert out == "You're not following any hashtags." + result = run(cli.tags_followed) + assert result.exit_code == 0 + assert result.stdout.strip() == "You're not following any hashtags." - out = run("tags_follow", "foo") - assert out == "✓ You are now following #foo" + result = run(cli.tags_follow, "foo") + assert result.exit_code == 0 + assert result.stdout.strip() == "✓ You are now following #foo" - out = run("tags_followed") - assert out == f"* #foo\t{base_url}/tags/foo" + result = run(cli.tags_followed) + assert result.exit_code == 0 + assert result.stdout.strip() == f"* #foo\t{base_url}/tags/foo" - out = run("tags_follow", "bar") - assert out == "✓ You are now following #bar" + result = run(cli.tags_follow, "bar") + assert result.exit_code == 0 + assert result.stdout.strip() == "✓ You are now following #bar" - out = run("tags_followed") - assert out == "\n".join([ + result = run(cli.tags_followed) + assert result.exit_code == 0 + assert result.stdout.strip() == "\n".join([ f"* #bar\t{base_url}/tags/bar", f"* #foo\t{base_url}/tags/foo", ]) - out = run("tags_unfollow", "foo") - assert out == "✓ You are no longer following #foo" + result = run(cli.tags_unfollow, "foo") + assert result.exit_code == 0 + assert result.stdout.strip() == "✓ You are no longer following #foo" - out = run("tags_followed") - assert out == f"* #bar\t{base_url}/tags/bar" + result = run(cli.tags_followed) + assert result.exit_code == 0 + assert result.stdout.strip() == f"* #bar\t{base_url}/tags/bar" def test_status(app, user, run): uuid = str(uuid4()) - response = api.post_status(app, user, uuid).json() + status_id = api.post_status(app, user, uuid).json()["id"] + + result = run(cli.status, status_id) + assert result.exit_code == 0 - out = run("status", response["id"]) + out = result.stdout.strip() assert uuid in out assert user.username in out - assert response["id"] in out + assert status_id in out -def test_thread(app, user, run): +def test_status_json(app, user, run): uuid = str(uuid4()) - s1 = api.post_status(app, user, uuid + "1").json() - s2 = api.post_status(app, user, uuid + "2", in_reply_to_id=s1["id"]).json() - s3 = api.post_status(app, user, uuid + "3", in_reply_to_id=s2["id"]).json() + status_id = api.post_status(app, user, uuid).json()["id"] + + result = run(cli.status, status_id, "--json") + assert result.exit_code == 0 + + status = from_dict(Status, json.loads(result.stdout)) + assert status.id == status_id + assert status.account.acct == user.username + assert uuid in status.content + + +def test_thread(app, user, run): + uuid1 = str(uuid4()) + uuid2 = str(uuid4()) + uuid3 = str(uuid4()) + + s1 = api.post_status(app, user, uuid1).json() + s2 = api.post_status(app, user, uuid2, in_reply_to_id=s1["id"]).json() + s3 = api.post_status(app, user, uuid3, in_reply_to_id=s2["id"]).json() for status in [s1, s2, s3]: - out = run("thread", status["id"]) - bits = re.split(r"─+", out) + result = run(cli.thread, status["id"]) + assert result.exit_code == 0 + + bits = re.split(r"─+", result.stdout.strip()) bits = [b for b in bits if b] assert len(bits) == 3 @@ -141,6 +201,6 @@ def test_thread(app, user, run): assert s2["id"] in bits[1] assert s3["id"] in bits[2] - assert f"{uuid}1" in bits[0] - assert f"{uuid}2" in bits[1] - assert f"{uuid}3" in bits[2] + assert uuid1 in bits[0] + assert uuid2 in bits[1] + assert uuid3 in bits[2] diff --git a/toot/__main__.py b/toot/__main__.py index abbb9e29..403038e6 100644 --- a/toot/__main__.py +++ b/toot/__main__.py @@ -1,3 +1,15 @@ -from .console import main +import sys +from toot.cli import cli +from toot.exceptions import ConsoleError +from toot.output import print_err +from toot.settings import load_settings -main() +try: + defaults = load_settings().get("commands", {}) + cli(default_map=defaults) +except ConsoleError as ex: + print_err(str(ex)) + sys.exit(1) +except KeyboardInterrupt: + print_err("Aborted") + sys.exit(1) diff --git a/toot/cli.py b/toot/cli.py new file mode 100644 index 00000000..dc509271 --- /dev/null +++ b/toot/cli.py @@ -0,0 +1,201 @@ +from itertools import chain +import logging +import sys +import click +import json as pyjson + +from functools import wraps +from toot import App, User, api, config +from toot.entities import Instance, Status, from_dict, Account +from toot.exceptions import ApiError, ConsoleError +from toot.output import print_account, print_instance, print_search_results, print_status, print_tag_list, print_timeline +from typing import Callable, Concatenate, NamedTuple, Optional, ParamSpec, TypeVar + + +# Tweak the Click context +# https://click.palletsprojects.com/en/8.1.x/api/#context +CONTEXT = dict( + # Enable using environment variables to set options + auto_envvar_prefix="TOOT", + # Add shorthand -h for invoking help + help_option_names=["-h", "--help"], + # Give help some more room (default is 80) + max_content_width=100, + # Always show default values for options + show_default=True, +) + + +# Data object to add to Click context +class Context(NamedTuple): + app: Optional[App] = None + user: Optional[User] = None + color: bool = False + debug: bool = False + quiet: bool = False + + +P = ParamSpec("P") +R = TypeVar("R") +T = TypeVar("T") + + +def pass_context(f: Callable[Concatenate[Context, P], R]) -> Callable[P, R]: + """Pass `obj` from click context as first argument.""" + @wraps(f) + def wrapped(*args: P.args, **kwargs: P.kwargs) -> R: + ctx = click.get_current_context() + return f(ctx.obj, *args, **kwargs) + + return wrapped + + +json_option = click.option( + "--json", + is_flag=True, + default=False, + help="Print data as JSON rather than human readable text" +) + + +@click.group(context_settings=CONTEXT) +@click.option("--debug/--no-debug", default=False, help="Log debug info to stderr") +@click.option("--color/--no-color", default=sys.stdout.isatty(), help="Use ANSI color in output") +@click.option("--quiet/--no-quiet", default=False, help="Don't print anything to stdout") +@click.pass_context +def cli(ctx, color, debug, quiet, app=None, user=None): + """Toot is a Mastodon CLI""" + user, app = config.get_active_user_app() + ctx.obj = Context(app, user, color, debug, quiet) + + if debug: + logging.basicConfig(level=logging.DEBUG) + + +@cli.command() +@json_option +@pass_context +def whoami(ctx: Context, json: bool): + """Display logged in user details""" + response = api.verify_credentials(ctx.app, ctx.user) + + if json: + click.echo(response.text) + else: + account = from_dict(Account, response.json()) + print_account(account) + + +@cli.command() +@click.argument("account") +@json_option +@pass_context +def whois(ctx: Context, account: str, json: bool): + """Display account details""" + account_dict = api.find_account(ctx.app, ctx.user, account) + + # Here it's not possible to avoid parsing json since it's needed to find the account. + if json: + click.echo(pyjson.dumps(account_dict)) + else: + account_obj = from_dict(Account, account_dict) + print_account(account_obj) + + +@cli.command() +@click.argument("instance_url", required=False) +@json_option +@pass_context +def instance(ctx: Context, instance_url: Optional[str], json: bool): + """Display instance details""" + default_url = ctx.app.base_url if ctx.app else None + base_url = instance_url or default_url + + if not base_url: + raise ConsoleError("Please specify an instance.") + + try: + response = api.get_instance(base_url) + except ApiError: + raise ConsoleError( + f"Instance not found at {base_url}.\n" + + "The given domain probably does not host a Mastodon instance." + ) + + if json: + print(response.text) + else: + instance = from_dict(Instance, response.json()) + print_instance(instance) + + +@cli.command() +@click.argument("query") +@click.option("-r", "--resolve", is_flag=True, help="Resolve non-local accounts") +@json_option +@pass_context +def search(ctx: Context, query: str, resolve: bool, json: bool): + response = api.search(ctx.app, ctx.user, query, resolve) + if json: + print(response.text) + else: + print_search_results(response.json()) + + +@cli.command() +@click.argument("status_id") +@json_option +@pass_context +def status(ctx: Context, status_id: str, json: bool): + """Show a single status""" + response = api.fetch_status(ctx.app, ctx.user, status_id) + if json: + print(response.text) + else: + status = from_dict(Status, response.json()) + print_status(status) + + +@cli.command() +@click.argument("status_id") +@json_option +@pass_context +def thread(ctx: Context, status_id: str, json: bool): + """Show thread for a toot.""" + context_response = api.context(ctx.app, ctx.user, status_id) + if json: + print(context_response.text) + else: + toot = api.fetch_status(ctx.app, ctx.user, status_id).json() + context = context_response.json() + + statuses = chain(context["ancestors"], [toot], context["descendants"]) + print_timeline(from_dict(Status, s) for s in statuses) + + +@cli.command(name="tags_followed") +@pass_context +def tags_followed(ctx: Context): + """List hashtags you follow""" + response = api.followed_tags(ctx.app, ctx.user) + print_tag_list(response) + + +@cli.command(name="tags_follow") +@click.argument("tag") +@pass_context +def tags_follow(ctx: Context, tag: str): + """Follow a hashtag""" + tag = tag.lstrip("#") + api.follow_tag(ctx.app, ctx.user, tag) + click.secho(f"✓ You are now following #{tag}", fg="green") + + +@cli.command(name="tags_unfollow") +@click.argument("tag") +@pass_context +def tags_unfollow(ctx: Context, tag: str): + """Unfollow a hashtag""" + tag = tag.lstrip("#") + api.unfollow_tag(ctx.app, ctx.user, tag) + click.secho(f"✓ You are no longer following #{tag}", fg="green") diff --git a/toot/exceptions.py b/toot/exceptions.py index 2bf495d6..c5e23500 100644 --- a/toot/exceptions.py +++ b/toot/exceptions.py @@ -1,4 +1,7 @@ -class ApiError(Exception): +from click import ClickException + + +class ApiError(ClickException): """Raised when an API request fails for whatever reason.""" @@ -10,5 +13,5 @@ class AuthenticationError(ApiError): """Raised when login fails.""" -class ConsoleError(Exception): +class ConsoleError(ClickException): """Raised when an error occurs which needs to be show to the user."""