diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7f78335..eae6a2b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,27 +1,27 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v2.3.0 + rev: v4.5.0 hooks: - id: check-yaml exclude: "(mkdocs.yml|recipe/meta.yaml)" - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/pre-commit/mirrors-mypy - rev: '' # Use the sha / tag you want to point at + rev: v1.6.1 hooks: - id: mypy additional_dependencies: ['types-requests'] - repo: https://github.com/asottile/pyupgrade - rev: v3.3.1 + rev: v3.15.0 hooks: - id: pyupgrade args: ["--py310-plus"] - repo: https://github.com/akaihola/darker - rev: 1.5.1 + rev: 1.7.2 hooks: - id: darker additional_dependencies: [black==22.10.0] - repo: https://github.com/PyCQA/flake8 - rev: 6.0.0 + rev: 6.1.0 hooks: - id: flake8 diff --git a/conda_auth/cli.py b/conda_auth/cli.py index e452b70..3f6b447 100644 --- a/conda_auth/cli.py +++ b/conda_auth/cli.py @@ -1,6 +1,6 @@ from __future__ import annotations -from collections.abc import MutableMapping +from typing import Literal import click from conda.base.context import context @@ -15,7 +15,7 @@ HTTP_BASIC_AUTH_NAME, TOKEN_NAME, ) -from .options import CustomOption +from .options import ConditionalOption # Constants AUTH_MANAGER_MAPPING = { @@ -31,133 +31,88 @@ FAILURE_COLOR = "red" -VALID_AUTH_CHOICES = tuple(AUTH_MANAGER_MAPPING.keys()) - OPTION_DEFAULT = "CONDA_AUTH_DEFAULT" -def parse_channel(ctx, param, value): - """ - Converts the channel name into a Channel object - """ - return Channel(value) - - -class ExtraContext: - """ - Used to provide more information about the running environment - """ - - def __init__(self): - self.used_options = set() - - -def get_auth_manager(options, extra_context: ExtraContext) -> tuple[str, AuthManager]: +def get_auth_manager( + auth: str | None = None, + basic: bool | None = None, + token: str | Literal[False] | None = None, + **kwargs, +) -> tuple[str, AuthManager]: """ Based on CLI options provided, return the correct auth manager to use. """ - auth_type = options.get("auth") - - if auth_type is not None: - auth_manager = AUTH_MANAGER_MAPPING.get(auth_type) - if auth_manager is None: - raise CondaAuthError( - f'Invalid authentication type. Valid types are: "{", ".join(VALID_AUTH_CHOICES)}"' - ) - return auth_type, auth_manager - - # we use http basic auth when "username" or "password" are present - if "basic" in extra_context.used_options: - auth_manager = basic_auth_manager - auth_type = HTTP_BASIC_AUTH_NAME - - # we use token auth when "token" is present - elif "token" in extra_context.used_options: - auth_manager = token_auth_manager - auth_type = TOKEN_NAME - - # raise error if authentication type not found + if auth: # set in .condarc + pass + elif basic: # defined on CLI + auth = HTTP_BASIC_AUTH_NAME + elif token: # defined on CLI + auth = TOKEN_NAME else: + raise CondaAuthError("Missing authentication type.") + + # check if auth defined maps to a valid auth manager + if not (auth_manager := AUTH_MANAGER_MAPPING.get(auth)): raise CondaAuthError( - click.style( - "Please specify an authentication type to use" - " with either the `--basic` or `--token` options.", - fg=FAILURE_COLOR, - ) + "Invalid authentication type. " + f"Valid types are: {set(AUTH_MANAGER_MAPPING)}" ) - return auth_type, auth_manager - - -def get_channel_settings(channel: str) -> MutableMapping[str, str] | None: - """ - Retrieve the channel settings from the context object - """ - for settings in context.channel_settings: - if settings.get("channel") == channel: - return dict(**settings) + return auth, auth_manager -@click.group("auth") -@click.pass_context -def group(ctx): +@click.group("auth", context_settings={"help_option_names": ["-h", "--help"]}) +def auth(): """ Commands for handling authentication within conda """ - ctx.obj = ExtraContext() - - -def auth_wrapper(args): - """Authentication commands for conda""" - group(args=args, prog_name="conda auth", standalone_mode=True) -@group.command("login") -@click.argument("channel", callback=parse_channel) +@auth.command("login") +@click.argument("channel", callback=lambda ctx, param, value: Channel(value)) +@click.option( + "-b", + "--basic", + help="Save login credentials as HTTP basic authentication", + cls=ConditionalOption, + is_flag=True, + mutually_exclusive={"token"}, + not_required_if={"token"}, +) @click.option( "-u", "--username", help="Username to use for private channels using HTTP Basic Authentication", - cls=CustomOption, - prompt=True, - mutually_exclusive=("token",), - prompt_when="basic", + cls=ConditionalOption, + prompt_when={"basic"}, + mutually_exclusive={"token"}, ) @click.option( "-p", "--password", help="Password to use for private channels using HTTP Basic Authentication", - cls=CustomOption, - prompt=True, + cls=ConditionalOption, + prompt_when={"basic"}, hide_input=True, - mutually_exclusive=("token",), - prompt_when="basic", + mutually_exclusive={"token"}, ) @click.option( "-t", "--token", help="Token to use for private channels using an API token", + cls=ConditionalOption, prompt=True, prompt_required=False, - cls=CustomOption, - mutually_exclusive=("username", "password"), + mutually_exclusive={"basic", "username", "password"}, + not_required_if={"basic"}, ) -@click.option( - "-b", - "--basic", - is_flag=True, - cls=CustomOption, - help="Save login credentials as HTTP basic authentication", -) -@click.pass_obj -def login(extra_context: ExtraContext, channel: Channel, **kwargs): +def login(channel: Channel, **kwargs): """ Log in to a channel by storing the credentials or tokens associated with it """ - settings = {key: val for key, val in kwargs.items() if val is not None} - - auth_type, auth_manager = get_auth_manager(settings, extra_context) - username: str | None = auth_manager.store(channel, settings) + auth_type, auth_manager = get_auth_manager(**kwargs) + username: str | None = auth_manager.store(channel, kwargs) click.echo(click.style(SUCCESSFUL_LOGIN_MESSAGE, fg=SUCCESSFUL_COLOR)) @@ -171,19 +126,24 @@ def login(extra_context: ExtraContext, channel: Channel, **kwargs): raise CondaAuthError(str(exc)) -@group.command("logout") -@click.argument("channel", callback=parse_channel) -@click.pass_obj -def logout(extra_context: ExtraContext, channel: Channel): +@auth.command("logout") +@click.argument("channel", callback=lambda ctx, param, value: Channel(value)) +def logout(channel: Channel): """ Log out of a channel by removing any credentials or tokens associated with it. """ - settings = get_channel_settings(channel.canonical_name) - - if settings is None: + settings = next( + ( + settings + for settings in context.channel_settings + if settings.get("channel") == channel.canonical_name + ), + None, + ) + if not settings: raise CondaAuthError("Unable to find information about logged in session.") - auth_type, auth_manager = get_auth_manager(settings, extra_context) + auth_type, auth_manager = get_auth_manager(**settings) auth_manager.remove_secret(channel, settings) click.echo(click.style(SUCCESSFUL_LOGOUT_MESSAGE, fg=SUCCESSFUL_COLOR)) diff --git a/conda_auth/options.py b/conda_auth/options.py index 62f9ed3..e9d9020 100644 --- a/conda_auth/options.py +++ b/conda_auth/options.py @@ -1,45 +1,62 @@ """ Module for custom click.Option classes """ +from __future__ import annotations import click +from typing import Any +from collections.abc import Mapping -class CustomOption(click.Option): +class ConditionalOption(click.Option): """ Custom option that does the following things: - - Allows you to define a "mutually_exclusive" tuple so certain options cannot be passed - together - - If ``prompt=True`` is set, can optionally control it to be prompted only in the presence of - other options via ``prompt_when`` - - Adds options which have been passed to ``ctx.obj.used_options`` + - Define ``mutually_exclusive`` options that cannot be passed together + - Control prompting in the presence of other options via ``prompt_when`` """ def __init__(self, *args, **kwargs): - self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", [])) - self.prompt_when = kwargs.pop("prompt_when", None) - help_message = kwargs.get("help", "") + self.not_required_if = set(kwargs.pop("not_required_if", [])) + + self.prompt_when = set(kwargs.pop("prompt_when", [])) + if self.prompt_when: + # ensure prompt text is configured, + # conditionally control whether we prompt in handle_parse_result + kwargs["prompt"] = True + self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", [])) if self.mutually_exclusive: - ex_str = ", ".join(f'"{option}"' for option in self.mutually_exclusive) - kwargs[ - "help" - ] = f"{help_message}; cannot be used with these options: {ex_str}" + # augment help blurb to include details about mutually exclusive options + help_ = kwargs.get("help", "") + mutex = ", ".join(map(repr, self.mutually_exclusive)) + kwargs["help"] = f"{help_}; cannot be used with these options: {mutex}" super().__init__(*args, **kwargs) - def handle_parse_result(self, ctx, opts, args): - if self.name in opts: - ctx.obj.used_options.add(self.name) - - if self.prompt_when is not None and self.prompt_when not in opts: - return None, args - + def handle_parse_result( + self, + ctx: click.Context, + opts: Mapping[str, Any], + args: list[str], + ) -> tuple[Any, list[str]]: + # determine whether mutex has been violated if self.mutually_exclusive.intersection(opts) and self.name in opts: - mutually_exclusive = ", ".join( - f'"{option}"' for option in self.mutually_exclusive - ) - raise click.UsageError( - f'Option "{self.name}" cannot be used with {mutually_exclusive}' + mutex = ", ".join(map(repr, self.mutually_exclusive)) + raise click.UsageError(f"Option {self.name!r} cannot be used with {mutex}") + + # determine whether we want to prompt for this argument + if self.prompt_when and not self.prompt_when.intersection(opts): + self.prompt = None + + if ( + self.not_required_if + and self.name not in opts + and not self.not_required_if.intersection(opts) + ): + required = {self.name, *self.not_required_if} + raise click.MissingParameter( + ctx=ctx, + param_type="option", + param_hint=" / ".join(sorted(map(repr, required))), ) return super().handle_parse_result(ctx, opts, args) diff --git a/conda_auth/hooks.py b/conda_auth/plugin.py similarity index 88% rename from conda_auth/hooks.py rename to conda_auth/plugin.py index 188fce2..e1a614d 100644 --- a/conda_auth/hooks.py +++ b/conda_auth/plugin.py @@ -12,7 +12,7 @@ HTTP_BASIC_AUTH_NAME, TOKEN_NAME, ) -from .cli import auth_wrapper +from .cli import auth from .constants import PLUGIN_NAME ENV_COMMANDS = { @@ -31,7 +31,9 @@ def conda_subcommands(): Registers subcommands """ yield CondaSubcommand( - name="auth", action=auth_wrapper, summary="Authentication commands for conda" + name="auth", + action=lambda args: auth(prog_name="conda auth", args=args), + summary="Authentication commands for conda", ) diff --git a/pyproject.toml b/pyproject.toml index 20ad3cc..0a6b104 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ dependencies = [ ] [project.entry-points.conda] -conda-auth = "conda_auth.hooks" +conda-auth = "conda_auth.plugin" [tool.setuptools.packages] find = {} diff --git a/tests/cli/test_group.py b/tests/cli/test_group.py index 93e9c43..eee170a 100644 --- a/tests/cli/test_group.py +++ b/tests/cli/test_group.py @@ -1,6 +1,6 @@ import pytest -from conda_auth.cli import auth_wrapper +from conda_auth.cli import auth def test_auth_wrapper(): @@ -11,4 +11,4 @@ def test_auth_wrapper(): exception. """ with pytest.raises(SystemExit): - auth_wrapper([]) + auth([]) diff --git a/tests/cli/test_login.py b/tests/cli/test_login.py index bc608f5..8186bbb 100644 --- a/tests/cli/test_login.py +++ b/tests/cli/test_login.py @@ -1,4 +1,4 @@ -from conda_auth.cli import group, SUCCESSFUL_LOGIN_MESSAGE +from conda_auth.cli import auth, SUCCESSFUL_LOGIN_MESSAGE from conda_auth.condarc import CondaRCError from conda_auth.exceptions import CondaAuthError @@ -16,7 +16,7 @@ def test_login_basic_auth_no_options(runner, keyring, condarc): # run command result = runner.invoke( - group, ["login", channel_name, "--basic"], input=f"{username}\n{secret}" + auth, ["login", channel_name, "--basic"], input=f"{username}\n{secret}" ) assert result.exit_code == 0 @@ -34,7 +34,7 @@ def test_login_with_options_basic_auth(runner, keyring, condarc): # run command result = runner.invoke( - group, + auth, ["login", channel_name, "--basic", "--username", "test", "--password", "test"], ) @@ -52,12 +52,12 @@ def test_login_with_invalid_auth_type(runner, keyring, condarc): keyring(None) # run command - result = runner.invoke(group, ["login", channel_name]) + result = runner.invoke(auth, ["login", channel_name]) exc_type, exception, _ = result.exc_info - assert result.exit_code == 1 - assert exc_type == CondaAuthError - assert "Please specify an authentication type" in exception.message + assert result.exit_code == 2 + assert exc_type == SystemExit + assert "Error: Missing option 'basic' / 'token'." in result.stdout def test_login_succeeds_error_returned_when_updating_condarc(runner, keyring, condarc): @@ -73,7 +73,7 @@ def test_login_succeeds_error_returned_when_updating_condarc(runner, keyring, co # run command result = runner.invoke( - group, ["login", channel_name, "--basic"], input="user\npassword" + auth, ["login", channel_name, "--basic"], input="user\npassword" ) exc_type, exception, _ = result.exc_info @@ -92,7 +92,7 @@ def test_login_token(mocker, runner, keyring, condarc): mock_context.channel_settings = [] keyring(None) - result = runner.invoke(group, ["login", channel_name, "--token", "token"]) + result = runner.invoke(auth, ["login", channel_name, "--token", "token"]) assert result.exit_code == 0 @@ -106,7 +106,7 @@ def test_login_token_no_options(runner, keyring, condarc): # setup mocks keyring(None) - result = runner.invoke(group, ["login", channel_name, "--token"], input="token\n") + result = runner.invoke(auth, ["login", channel_name, "--token"], input="token\n") assert result.exit_code == 0 assert SUCCESSFUL_LOGIN_MESSAGE in result.output diff --git a/tests/cli/test_logout.py b/tests/cli/test_logout.py index 0ff252e..ecb5137 100644 --- a/tests/cli/test_logout.py +++ b/tests/cli/test_logout.py @@ -1,4 +1,4 @@ -from conda_auth.cli import group, SUCCESSFUL_LOGOUT_MESSAGE +from conda_auth.cli import auth, SUCCESSFUL_LOGOUT_MESSAGE from conda_auth.constants import PLUGIN_NAME from conda_auth.handlers.basic_auth import HTTP_BASIC_AUTH_NAME from conda_auth.exceptions import CondaAuthError @@ -20,7 +20,7 @@ def test_logout_of_active_session(mocker, runner, keyring): ] # run command - result = runner.invoke(group, ["logout", channel_name]) + result = runner.invoke(auth, ["logout", channel_name]) assert SUCCESSFUL_LOGOUT_MESSAGE in result.output assert result.exit_code == 0 @@ -43,7 +43,7 @@ def test_logout_of_non_existing_session(mocker, runner, keyring): mock_context.channel_settings = [] # run command - result = runner.invoke(group, ["logout", channel_name]) + result = runner.invoke(auth, ["logout", channel_name]) exc_type, exception, _ = result.exc_info assert exc_type == CondaAuthError diff --git a/tests/requirements.txt b/tests/requirements.txt new file mode 100644 index 0000000..71ee80d --- /dev/null +++ b/tests/requirements.txt @@ -0,0 +1,5 @@ +click +conda +keyring +requests +ruamel.yaml diff --git a/tests/test_hooks.py b/tests/test_plugin.py similarity index 82% rename from tests/test_hooks.py rename to tests/test_plugin.py index 015bd1c..d14d4db 100644 --- a/tests/test_hooks.py +++ b/tests/test_plugin.py @@ -1,6 +1,6 @@ from conda.cli.conda_argparse import BUILTIN_COMMANDS -from conda_auth import hooks +from conda_auth import plugin from conda_auth.constants import PLUGIN_NAME from conda_auth.handlers import ( HTTP_BASIC_AUTH_NAME, @@ -14,7 +14,7 @@ def test_conda_subcommands_hook(): """ Test to make sure that this hook yields the correct objects. """ - objs = list(hooks.conda_subcommands()) + objs = list(plugin.conda_subcommands()) assert objs[0].name == "auth" assert objs[0].summary == "Authentication commands for conda" @@ -24,9 +24,9 @@ def test_conda_pre_commands_hook(): """ Test to make sure that this hook yields the correct objects. """ - objs = list(hooks.conda_pre_commands()) + objs = list(plugin.conda_pre_commands()) - run_for = BUILTIN_COMMANDS.union(hooks.ENV_COMMANDS) + run_for = BUILTIN_COMMANDS.union(plugin.ENV_COMMANDS) assert objs[0].name == f"{PLUGIN_NAME}-{HTTP_BASIC_AUTH_NAME}" assert objs[0].run_for == run_for @@ -39,7 +39,7 @@ def test_conda_auth_handlers_hook(): """ Test to make sure that this hook yields the correct objects. """ - objs = list(hooks.conda_auth_handlers()) + objs = list(plugin.conda_auth_handlers()) assert objs[0].name == HTTP_BASIC_AUTH_NAME assert objs[0].handler == BasicAuthHandler