Skip to content

Commit

Permalink
Merge pull request #22 from conda-incubator/code-review
Browse files Browse the repository at this point in the history
Suggested changes for token support (#16)
  • Loading branch information
travishathaway committed Oct 21, 2023
2 parents e6d6ade + df23763 commit ac3c143
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 152 deletions.
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
rev: v4.5.0
hooks:
- id: check-yaml
exclude: "(mkdocs.yml|recipe/meta.yaml)"
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/pre-commit/mirrors-mypy
rev: '' # Use the sha / tag you want to point at
rev: v1.6.1
hooks:
- id: mypy
additional_dependencies: ['types-requests']
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
rev: v3.15.0
hooks:
- id: pyupgrade
args: ["--py310-plus"]
- repo: https://github.com/akaihola/darker
rev: 1.5.1
rev: 1.7.2
hooks:
- id: darker
additional_dependencies: [black==22.10.0]
- repo: https://github.com/PyCQA/flake8
rev: 6.0.0
rev: 6.1.0
hooks:
- id: flake8
158 changes: 59 additions & 99 deletions conda_auth/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from collections.abc import MutableMapping
from typing import Literal

import click
from conda.base.context import context
Expand All @@ -15,7 +15,7 @@
HTTP_BASIC_AUTH_NAME,
TOKEN_NAME,
)
from .options import CustomOption
from .options import ConditionalOption

# Constants
AUTH_MANAGER_MAPPING = {
Expand All @@ -31,133 +31,88 @@

FAILURE_COLOR = "red"

VALID_AUTH_CHOICES = tuple(AUTH_MANAGER_MAPPING.keys())

OPTION_DEFAULT = "CONDA_AUTH_DEFAULT"


def parse_channel(ctx, param, value):
"""
Converts the channel name into a Channel object
"""
return Channel(value)


class ExtraContext:
"""
Used to provide more information about the running environment
"""

def __init__(self):
self.used_options = set()


def get_auth_manager(options, extra_context: ExtraContext) -> tuple[str, AuthManager]:
def get_auth_manager(
auth: str | None = None,
basic: bool | None = None,
token: str | Literal[False] | None = None,
**kwargs,
) -> tuple[str, AuthManager]:
"""
Based on CLI options provided, return the correct auth manager to use.
"""
auth_type = options.get("auth")

if auth_type is not None:
auth_manager = AUTH_MANAGER_MAPPING.get(auth_type)
if auth_manager is None:
raise CondaAuthError(
f'Invalid authentication type. Valid types are: "{", ".join(VALID_AUTH_CHOICES)}"'
)
return auth_type, auth_manager

# we use http basic auth when "username" or "password" are present
if "basic" in extra_context.used_options:
auth_manager = basic_auth_manager
auth_type = HTTP_BASIC_AUTH_NAME

# we use token auth when "token" is present
elif "token" in extra_context.used_options:
auth_manager = token_auth_manager
auth_type = TOKEN_NAME

# raise error if authentication type not found
if auth: # set in .condarc
pass
elif basic: # defined on CLI
auth = HTTP_BASIC_AUTH_NAME
elif token: # defined on CLI
auth = TOKEN_NAME
else:
raise CondaAuthError("Missing authentication type.")

# check if auth defined maps to a valid auth manager
if not (auth_manager := AUTH_MANAGER_MAPPING.get(auth)):
raise CondaAuthError(
click.style(
"Please specify an authentication type to use"
" with either the `--basic` or `--token` options.",
fg=FAILURE_COLOR,
)
"Invalid authentication type. "
f"Valid types are: {set(AUTH_MANAGER_MAPPING)}"
)

return auth_type, auth_manager


def get_channel_settings(channel: str) -> MutableMapping[str, str] | None:
"""
Retrieve the channel settings from the context object
"""
for settings in context.channel_settings:
if settings.get("channel") == channel:
return dict(**settings)
return auth, auth_manager


@click.group("auth")
@click.pass_context
def group(ctx):
@click.group("auth", context_settings={"help_option_names": ["-h", "--help"]})
def auth():
"""
Commands for handling authentication within conda
"""
ctx.obj = ExtraContext()


def auth_wrapper(args):
"""Authentication commands for conda"""
group(args=args, prog_name="conda auth", standalone_mode=True)


@group.command("login")
@click.argument("channel", callback=parse_channel)
@auth.command("login")
@click.argument("channel", callback=lambda ctx, param, value: Channel(value))
@click.option(
"-b",
"--basic",
help="Save login credentials as HTTP basic authentication",
cls=ConditionalOption,
is_flag=True,
mutually_exclusive={"token"},
not_required_if={"token"},
)
@click.option(
"-u",
"--username",
help="Username to use for private channels using HTTP Basic Authentication",
cls=CustomOption,
prompt=True,
mutually_exclusive=("token",),
prompt_when="basic",
cls=ConditionalOption,
prompt_when={"basic"},
mutually_exclusive={"token"},
)
@click.option(
"-p",
"--password",
help="Password to use for private channels using HTTP Basic Authentication",
cls=CustomOption,
prompt=True,
cls=ConditionalOption,
prompt_when={"basic"},
hide_input=True,
mutually_exclusive=("token",),
prompt_when="basic",
mutually_exclusive={"token"},
)
@click.option(
"-t",
"--token",
help="Token to use for private channels using an API token",
cls=ConditionalOption,
prompt=True,
prompt_required=False,
cls=CustomOption,
mutually_exclusive=("username", "password"),
mutually_exclusive={"basic", "username", "password"},
not_required_if={"basic"},
)
@click.option(
"-b",
"--basic",
is_flag=True,
cls=CustomOption,
help="Save login credentials as HTTP basic authentication",
)
@click.pass_obj
def login(extra_context: ExtraContext, channel: Channel, **kwargs):
def login(channel: Channel, **kwargs):
"""
Log in to a channel by storing the credentials or tokens associated with it
"""
settings = {key: val for key, val in kwargs.items() if val is not None}

auth_type, auth_manager = get_auth_manager(settings, extra_context)
username: str | None = auth_manager.store(channel, settings)
auth_type, auth_manager = get_auth_manager(**kwargs)
username: str | None = auth_manager.store(channel, kwargs)

click.echo(click.style(SUCCESSFUL_LOGIN_MESSAGE, fg=SUCCESSFUL_COLOR))

Expand All @@ -171,19 +126,24 @@ def login(extra_context: ExtraContext, channel: Channel, **kwargs):
raise CondaAuthError(str(exc))


@group.command("logout")
@click.argument("channel", callback=parse_channel)
@click.pass_obj
def logout(extra_context: ExtraContext, channel: Channel):
@auth.command("logout")
@click.argument("channel", callback=lambda ctx, param, value: Channel(value))
def logout(channel: Channel):
"""
Log out of a channel by removing any credentials or tokens associated with it.
"""
settings = get_channel_settings(channel.canonical_name)

if settings is None:
settings = next(
(
settings
for settings in context.channel_settings
if settings.get("channel") == channel.canonical_name
),
None,
)
if not settings:
raise CondaAuthError("Unable to find information about logged in session.")

auth_type, auth_manager = get_auth_manager(settings, extra_context)
auth_type, auth_manager = get_auth_manager(**settings)
auth_manager.remove_secret(channel, settings)

click.echo(click.style(SUCCESSFUL_LOGOUT_MESSAGE, fg=SUCCESSFUL_COLOR))
67 changes: 42 additions & 25 deletions conda_auth/options.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,62 @@
"""
Module for custom click.Option classes
"""
from __future__ import annotations
import click
from typing import Any
from collections.abc import Mapping


class CustomOption(click.Option):
class ConditionalOption(click.Option):
"""
Custom option that does the following things:
- Allows you to define a "mutually_exclusive" tuple so certain options cannot be passed
together
- If ``prompt=True`` is set, can optionally control it to be prompted only in the presence of
other options via ``prompt_when``
- Adds options which have been passed to ``ctx.obj.used_options``
- Define ``mutually_exclusive`` options that cannot be passed together
- Control prompting in the presence of other options via ``prompt_when``
"""
def __init__(self, *args, **kwargs):
self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", []))
self.prompt_when = kwargs.pop("prompt_when", None)
help_message = kwargs.get("help", "")
self.not_required_if = set(kwargs.pop("not_required_if", []))

self.prompt_when = set(kwargs.pop("prompt_when", []))
if self.prompt_when:
# ensure prompt text is configured,
# conditionally control whether we prompt in handle_parse_result
kwargs["prompt"] = True

self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", []))
if self.mutually_exclusive:
ex_str = ", ".join(f'"{option}"' for option in self.mutually_exclusive)
kwargs[
"help"
] = f"{help_message}; cannot be used with these options: {ex_str}"
# augment help blurb to include details about mutually exclusive options
help_ = kwargs.get("help", "")
mutex = ", ".join(map(repr, self.mutually_exclusive))
kwargs["help"] = f"{help_}; cannot be used with these options: {mutex}"

super().__init__(*args, **kwargs)

def handle_parse_result(self, ctx, opts, args):
if self.name in opts:
ctx.obj.used_options.add(self.name)

if self.prompt_when is not None and self.prompt_when not in opts:
return None, args

def handle_parse_result(
self,
ctx: click.Context,
opts: Mapping[str, Any],
args: list[str],
) -> tuple[Any, list[str]]:
# determine whether mutex has been violated
if self.mutually_exclusive.intersection(opts) and self.name in opts:
mutually_exclusive = ", ".join(
f'"{option}"' for option in self.mutually_exclusive
)
raise click.UsageError(
f'Option "{self.name}" cannot be used with {mutually_exclusive}'
mutex = ", ".join(map(repr, self.mutually_exclusive))
raise click.UsageError(f"Option {self.name!r} cannot be used with {mutex}")

# determine whether we want to prompt for this argument
if self.prompt_when and not self.prompt_when.intersection(opts):
self.prompt = None

if (
self.not_required_if
and self.name not in opts
and not self.not_required_if.intersection(opts)
):
required = {self.name, *self.not_required_if}
raise click.MissingParameter(
ctx=ctx,
param_type="option",
param_hint=" / ".join(sorted(map(repr, required))),
)

return super().handle_parse_result(ctx, opts, args)
6 changes: 4 additions & 2 deletions conda_auth/hooks.py → conda_auth/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
HTTP_BASIC_AUTH_NAME,
TOKEN_NAME,
)
from .cli import auth_wrapper
from .cli import auth
from .constants import PLUGIN_NAME

ENV_COMMANDS = {
Expand All @@ -31,7 +31,9 @@ def conda_subcommands():
Registers subcommands
"""
yield CondaSubcommand(
name="auth", action=auth_wrapper, summary="Authentication commands for conda"
name="auth",
action=lambda args: auth(prog_name="conda auth", args=args),
summary="Authentication commands for conda",
)


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies = [
]

[project.entry-points.conda]
conda-auth = "conda_auth.hooks"
conda-auth = "conda_auth.plugin"

[tool.setuptools.packages]
find = {}
4 changes: 2 additions & 2 deletions tests/cli/test_group.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from conda_auth.cli import auth_wrapper
from conda_auth.cli import auth


def test_auth_wrapper():
Expand All @@ -11,4 +11,4 @@ def test_auth_wrapper():
exception.
"""
with pytest.raises(SystemExit):
auth_wrapper([])
auth([])
Loading

0 comments on commit ac3c143

Please sign in to comment.