Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suggested changes for token support (#16) #22

Merged
merged 5 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
rev: v4.5.0
hooks:
- id: check-yaml
exclude: "(mkdocs.yml|recipe/meta.yaml)"
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/pre-commit/mirrors-mypy
rev: '' # Use the sha / tag you want to point at
rev: v1.6.1
hooks:
- id: mypy
additional_dependencies: ['types-requests']
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
rev: v3.15.0
hooks:
- id: pyupgrade
args: ["--py310-plus"]
- repo: https://github.com/akaihola/darker
rev: 1.5.1
rev: 1.7.2
hooks:
- id: darker
additional_dependencies: [black==22.10.0]
- repo: https://github.com/PyCQA/flake8
rev: 6.0.0
rev: 6.1.0
hooks:
- id: flake8
158 changes: 59 additions & 99 deletions conda_auth/cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from collections.abc import MutableMapping
from typing import Literal

import click
from conda.base.context import context
Expand All @@ -15,7 +15,7 @@
HTTP_BASIC_AUTH_NAME,
TOKEN_NAME,
)
from .options import CustomOption
from .options import ConditionalOption

# Constants
AUTH_MANAGER_MAPPING = {
Expand All @@ -31,133 +31,88 @@

FAILURE_COLOR = "red"

VALID_AUTH_CHOICES = tuple(AUTH_MANAGER_MAPPING.keys())

OPTION_DEFAULT = "CONDA_AUTH_DEFAULT"


def parse_channel(ctx, param, value):
"""
Converts the channel name into a Channel object
"""
return Channel(value)


class ExtraContext:
"""
Used to provide more information about the running environment
"""

def __init__(self):
self.used_options = set()


def get_auth_manager(options, extra_context: ExtraContext) -> tuple[str, AuthManager]:
def get_auth_manager(
auth: str | None = None,
basic: bool | None = None,
token: str | Literal[False] | None = None,
**kwargs,
) -> tuple[str, AuthManager]:
"""
Based on CLI options provided, return the correct auth manager to use.
"""
auth_type = options.get("auth")

if auth_type is not None:
auth_manager = AUTH_MANAGER_MAPPING.get(auth_type)
if auth_manager is None:
raise CondaAuthError(
f'Invalid authentication type. Valid types are: "{", ".join(VALID_AUTH_CHOICES)}"'
)
return auth_type, auth_manager

# we use http basic auth when "username" or "password" are present
if "basic" in extra_context.used_options:
auth_manager = basic_auth_manager
auth_type = HTTP_BASIC_AUTH_NAME

# we use token auth when "token" is present
elif "token" in extra_context.used_options:
auth_manager = token_auth_manager
auth_type = TOKEN_NAME

# raise error if authentication type not found
if auth: # set in .condarc
pass
elif basic: # defined on CLI
auth = HTTP_BASIC_AUTH_NAME
elif token: # defined on CLI
auth = TOKEN_NAME
else:
raise CondaAuthError("Missing authentication type.")

# check if auth defined maps to a valid auth manager
if not (auth_manager := AUTH_MANAGER_MAPPING.get(auth)):
raise CondaAuthError(
click.style(
"Please specify an authentication type to use"
" with either the `--basic` or `--token` options.",
fg=FAILURE_COLOR,
)
"Invalid authentication type. "
f"Valid types are: {set(AUTH_MANAGER_MAPPING)}"
)

return auth_type, auth_manager


def get_channel_settings(channel: str) -> MutableMapping[str, str] | None:
"""
Retrieve the channel settings from the context object
"""
for settings in context.channel_settings:
if settings.get("channel") == channel:
return dict(**settings)
return auth, auth_manager


@click.group("auth")
@click.pass_context
def group(ctx):
@click.group("auth", context_settings={"help_option_names": ["-h", "--help"]})
def auth():
"""
Commands for handling authentication within conda
"""
ctx.obj = ExtraContext()


def auth_wrapper(args):
"""Authentication commands for conda"""
group(args=args, prog_name="conda auth", standalone_mode=True)


@group.command("login")
@click.argument("channel", callback=parse_channel)
@auth.command("login")
@click.argument("channel", callback=lambda ctx, param, value: Channel(value))
@click.option(
"-b",
"--basic",
help="Save login credentials as HTTP basic authentication",
cls=ConditionalOption,
is_flag=True,
mutually_exclusive={"token"},
not_required_if={"token"},
)
@click.option(
"-u",
"--username",
help="Username to use for private channels using HTTP Basic Authentication",
cls=CustomOption,
prompt=True,
mutually_exclusive=("token",),
prompt_when="basic",
cls=ConditionalOption,
prompt_when={"basic"},
mutually_exclusive={"token"},
)
@click.option(
"-p",
"--password",
help="Password to use for private channels using HTTP Basic Authentication",
cls=CustomOption,
prompt=True,
cls=ConditionalOption,
prompt_when={"basic"},
hide_input=True,
mutually_exclusive=("token",),
prompt_when="basic",
mutually_exclusive={"token"},
)
@click.option(
"-t",
"--token",
help="Token to use for private channels using an API token",
cls=ConditionalOption,
prompt=True,
prompt_required=False,
cls=CustomOption,
mutually_exclusive=("username", "password"),
mutually_exclusive={"basic", "username", "password"},
not_required_if={"basic"},
)
@click.option(
"-b",
"--basic",
is_flag=True,
cls=CustomOption,
help="Save login credentials as HTTP basic authentication",
)
@click.pass_obj
def login(extra_context: ExtraContext, channel: Channel, **kwargs):
def login(channel: Channel, **kwargs):
"""
Log in to a channel by storing the credentials or tokens associated with it
"""
settings = {key: val for key, val in kwargs.items() if val is not None}

auth_type, auth_manager = get_auth_manager(settings, extra_context)
username: str | None = auth_manager.store(channel, settings)
auth_type, auth_manager = get_auth_manager(**kwargs)
username: str | None = auth_manager.store(channel, kwargs)

click.echo(click.style(SUCCESSFUL_LOGIN_MESSAGE, fg=SUCCESSFUL_COLOR))

Expand All @@ -171,19 +126,24 @@ def login(extra_context: ExtraContext, channel: Channel, **kwargs):
raise CondaAuthError(str(exc))


@group.command("logout")
@click.argument("channel", callback=parse_channel)
@click.pass_obj
def logout(extra_context: ExtraContext, channel: Channel):
@auth.command("logout")
@click.argument("channel", callback=lambda ctx, param, value: Channel(value))
def logout(channel: Channel):
"""
Log out of a channel by removing any credentials or tokens associated with it.
"""
settings = get_channel_settings(channel.canonical_name)

if settings is None:
settings = next(
(
settings
for settings in context.channel_settings
if settings.get("channel") == channel.canonical_name
),
None,
)
if not settings:
raise CondaAuthError("Unable to find information about logged in session.")

auth_type, auth_manager = get_auth_manager(settings, extra_context)
auth_type, auth_manager = get_auth_manager(**settings)
auth_manager.remove_secret(channel, settings)

click.echo(click.style(SUCCESSFUL_LOGOUT_MESSAGE, fg=SUCCESSFUL_COLOR))
67 changes: 42 additions & 25 deletions conda_auth/options.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,62 @@
"""
Module for custom click.Option classes
"""
from __future__ import annotations
import click
from typing import Any
from collections.abc import Mapping


class CustomOption(click.Option):
class ConditionalOption(click.Option):
"""
Custom option that does the following things:

- Allows you to define a "mutually_exclusive" tuple so certain options cannot be passed
together
- If ``prompt=True`` is set, can optionally control it to be prompted only in the presence of
other options via ``prompt_when``
- Adds options which have been passed to ``ctx.obj.used_options``
- Define ``mutually_exclusive`` options that cannot be passed together
- Control prompting in the presence of other options via ``prompt_when``
"""
def __init__(self, *args, **kwargs):
self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", []))
self.prompt_when = kwargs.pop("prompt_when", None)
help_message = kwargs.get("help", "")
self.not_required_if = set(kwargs.pop("not_required_if", []))

self.prompt_when = set(kwargs.pop("prompt_when", []))
if self.prompt_when:
# ensure prompt text is configured,
# conditionally control whether we prompt in handle_parse_result
kwargs["prompt"] = True

self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", []))
if self.mutually_exclusive:
ex_str = ", ".join(f'"{option}"' for option in self.mutually_exclusive)
kwargs[
"help"
] = f"{help_message}; cannot be used with these options: {ex_str}"
# augment help blurb to include details about mutually exclusive options
help_ = kwargs.get("help", "")
mutex = ", ".join(map(repr, self.mutually_exclusive))
kwargs["help"] = f"{help_}; cannot be used with these options: {mutex}"

super().__init__(*args, **kwargs)

def handle_parse_result(self, ctx, opts, args):
if self.name in opts:
ctx.obj.used_options.add(self.name)

if self.prompt_when is not None and self.prompt_when not in opts:
return None, args

def handle_parse_result(
self,
ctx: click.Context,
opts: Mapping[str, Any],
args: list[str],
) -> tuple[Any, list[str]]:
# determine whether mutex has been violated
if self.mutually_exclusive.intersection(opts) and self.name in opts:
mutually_exclusive = ", ".join(
f'"{option}"' for option in self.mutually_exclusive
)
raise click.UsageError(
f'Option "{self.name}" cannot be used with {mutually_exclusive}'
mutex = ", ".join(map(repr, self.mutually_exclusive))
raise click.UsageError(f"Option {self.name!r} cannot be used with {mutex}")

# determine whether we want to prompt for this argument
if self.prompt_when and not self.prompt_when.intersection(opts):
self.prompt = None

if (
self.not_required_if
and self.name not in opts
and not self.not_required_if.intersection(opts)
):
required = {self.name, *self.not_required_if}
raise click.MissingParameter(
ctx=ctx,
param_type="option",
param_hint=" / ".join(sorted(map(repr, required))),
)

return super().handle_parse_result(ctx, opts, args)
6 changes: 4 additions & 2 deletions conda_auth/hooks.py → conda_auth/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
HTTP_BASIC_AUTH_NAME,
TOKEN_NAME,
)
from .cli import auth_wrapper
from .cli import auth
from .constants import PLUGIN_NAME

ENV_COMMANDS = {
Expand All @@ -31,7 +31,9 @@ def conda_subcommands():
Registers subcommands
"""
yield CondaSubcommand(
name="auth", action=auth_wrapper, summary="Authentication commands for conda"
name="auth",
action=lambda args: auth(prog_name="conda auth", args=args),
summary="Authentication commands for conda",
)


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies = [
]

[project.entry-points.conda]
conda-auth = "conda_auth.hooks"
conda-auth = "conda_auth.plugin"

[tool.setuptools.packages]
find = {}
4 changes: 2 additions & 2 deletions tests/cli/test_group.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from conda_auth.cli import auth_wrapper
from conda_auth.cli import auth


def test_auth_wrapper():
Expand All @@ -11,4 +11,4 @@ def test_auth_wrapper():
exception.
"""
with pytest.raises(SystemExit):
auth_wrapper([])
auth([])
Loading