Skip to content

Commit

Permalink
Merge pull request #16 from conda-incubator/add-token-support
Browse files Browse the repository at this point in the history
Add token support
  • Loading branch information
travishathaway authored Oct 21, 2023
2 parents cc2e03c + 44a3aef commit e6d6ade
Show file tree
Hide file tree
Showing 21 changed files with 860 additions and 594 deletions.
144 changes: 100 additions & 44 deletions conda_auth/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,33 @@
from conda.models.channel import Channel

from .condarc import CondaRC, CondaRCError
from .constants import OAUTH2_NAME, HTTP_BASIC_AUTH_NAME
from .exceptions import CondaAuthError, InvalidCredentialsError
from .handlers import AuthManager, oauth2_manager, basic_auth_manager
from .exceptions import CondaAuthError
from .handlers import (
AuthManager,
basic_auth_manager,
token_auth_manager,
HTTP_BASIC_AUTH_NAME,
TOKEN_NAME,
)
from .options import CustomOption

# Constants
AUTH_MANAGER_MAPPING = {
OAUTH2_NAME: oauth2_manager,
HTTP_BASIC_AUTH_NAME: basic_auth_manager,
TOKEN_NAME: token_auth_manager,
}
SUCCESSFUL_LOGIN_MESSAGE = "Successfully logged in"
SUCCESSFUL_LOGOUT_MESSAGE = "Successfully logged out"

SUCCESSFUL_LOGIN_MESSAGE = "Successfully stored credentials"

SUCCESSFUL_LOGOUT_MESSAGE = "Successfully removed credentials"

SUCCESSFUL_COLOR = "green"
MAX_LOGIN_ATTEMPTS = 3

FAILURE_COLOR = "red"

VALID_AUTH_CHOICES = tuple(AUTH_MANAGER_MAPPING.keys())

OPTION_DEFAULT = "CONDA_AUTH_DEFAULT"


def parse_channel(ctx, param, value):
Expand All @@ -28,28 +43,48 @@ def parse_channel(ctx, param, value):
return Channel(value)


def get_auth_manager(options) -> tuple[str, AuthManager]:
class ExtraContext:
"""
Used to provide more information about the running environment
"""

def __init__(self):
self.used_options = set()


def get_auth_manager(options, extra_context: ExtraContext) -> tuple[str, AuthManager]:
"""
Based on CLI options provided, return the correct auth manager to use.
"""
auth_type = options.get("type") or options.get("auth")
auth_type = options.get("auth")

if auth_type is not None:
auth_manager = AUTH_MANAGER_MAPPING.get(auth_type)
if auth_manager is None:
raise CondaAuthError(
f'Invalid authentication type. Valid types are: "{HTTP_BASIC_AUTH_NAME}"'
f'Invalid authentication type. Valid types are: "{", ".join(VALID_AUTH_CHOICES)}"'
)
return auth_type, auth_manager

# we use http basic auth when username or password are present
elif options.get("username") is not None or options.get("password") is not None:
# we use http basic auth when "username" or "password" are present
if "basic" in extra_context.used_options:
auth_manager = basic_auth_manager
auth_type = HTTP_BASIC_AUTH_NAME

# default authentication handler
# we use token auth when "token" is present
elif "token" in extra_context.used_options:
auth_manager = token_auth_manager
auth_type = TOKEN_NAME

# raise error if authentication type not found
else:
auth_manager = basic_auth_manager
auth_type = HTTP_BASIC_AUTH_NAME
raise CondaAuthError(
click.style(
"Please specify an authentication type to use"
" with either the `--basic` or `--token` options.",
fg=FAILURE_COLOR,
)
)

return auth_type, auth_manager

Expand All @@ -64,10 +99,12 @@ def get_channel_settings(channel: str) -> MutableMapping[str, str] | None:


@click.group("auth")
def group():
@click.pass_context
def group(ctx):
"""
Commands for handling authentication within conda
"""
ctx.obj = ExtraContext()


def auth_wrapper(args):
Expand All @@ -76,58 +113,77 @@ def auth_wrapper(args):


@group.command("login")
@click.option("-u", "--username", help="Username to use for HTTP Basic Authentication")
@click.option("-p", "--password", help="Password to use for HTTP Basic Authentication")
@click.argument("channel", callback=parse_channel)
@click.option(
"-u",
"--username",
help="Username to use for private channels using HTTP Basic Authentication",
cls=CustomOption,
prompt=True,
mutually_exclusive=("token",),
prompt_when="basic",
)
@click.option(
"-p",
"--password",
help="Password to use for private channels using HTTP Basic Authentication",
cls=CustomOption,
prompt=True,
hide_input=True,
mutually_exclusive=("token",),
prompt_when="basic",
)
@click.option(
"-t",
"--type",
help='Manually specify the type of authentication to use. Choices are: "http-basic"',
"--token",
help="Token to use for private channels using an API token",
prompt=True,
prompt_required=False,
cls=CustomOption,
mutually_exclusive=("username", "password"),
)
@click.argument("channel", callback=parse_channel)
def login(channel: Channel, **kwargs):
@click.option(
"-b",
"--basic",
is_flag=True,
cls=CustomOption,
help="Save login credentials as HTTP basic authentication",
)
@click.pass_obj
def login(extra_context: ExtraContext, channel: Channel, **kwargs):
"""
Login to a channel
Log in to a channel by storing the credentials or tokens associated with it
"""
kwargs = {key: val for key, val in kwargs.items() if val is not None}
settings = get_channel_settings(channel.canonical_name) or {}
settings.update(kwargs)

auth_type, auth_manager = get_auth_manager(settings)
attempts = 0

while True:
try:
username = auth_manager.authenticate(channel, settings)
break
except InvalidCredentialsError as exc:
auth_manager.remove_channel_cache(channel.canonical_name)
attempts += 1
if attempts >= MAX_LOGIN_ATTEMPTS:
raise CondaAuthError(f"Max attempts reached; {exc}")
settings = {key: val for key, val in kwargs.items() if val is not None}

auth_type, auth_manager = get_auth_manager(settings, extra_context)
username: str | None = auth_manager.store(channel, settings)

click.echo(click.style(SUCCESSFUL_LOGIN_MESSAGE, fg=SUCCESSFUL_COLOR))

try:
condarc = CondaRC()
condarc.update_channel_settings(channel.canonical_name, username, auth_type)
if auth_type == TOKEN_NAME:
username = None
condarc.update_channel_settings(channel.canonical_name, auth_type, username)
condarc.save()
except CondaRCError as exc:
raise CondaAuthError(str(exc))


@group.command("logout")
@click.argument("channel", callback=parse_channel)
def logout(channel: Channel):
@click.pass_obj
def logout(extra_context: ExtraContext, channel: Channel):
"""
Logout of a channel
Log out of a channel by removing any credentials or tokens associated with it.
"""
settings = get_channel_settings(channel.canonical_name)

if settings is None:
raise CondaAuthError("Unable to find information about logged in session.")

settings["type"] = settings["auth"]
auth_type, auth_manager = get_auth_manager(settings)
auth_type, auth_manager = get_auth_manager(settings, extra_context)
auth_manager.remove_secret(channel, settings)

click.echo(click.style(SUCCESSFUL_LOGOUT_MESSAGE, fg=SUCCESSFUL_COLOR))
13 changes: 11 additions & 2 deletions conda_auth/condarc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,20 @@ def __init__(self, condarc_path: Path | None = None):
except YAMLError as exc:
raise CondaRCError(f"Could not parse condarc: {exc}")

def update_channel_settings(self, channel: str, username: str, auth_type: str):
def update_channel_settings(
self, channel: str, auth_type: str, username: str | None = None
):
"""
Update the condarc file's "channel_settings" section
"""
updated_settings = {"channel": channel, "auth": auth_type, "username": username}
if username is None:
updated_settings = {"channel": channel, "auth": auth_type}
else:
updated_settings = {
"channel": channel,
"auth": auth_type,
"username": username,
}

channel_settings = self.loaded_yaml.get("channel_settings", []) or []

Expand Down
7 changes: 0 additions & 7 deletions conda_auth/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,4 @@

PLUGIN_NAME = "conda-auth"

# move to the handlers module
OAUTH2_NAME = "oauth2"

# move to the handlers module
HTTP_BASIC_AUTH_NAME = "http-basic"

# Error messages
LOGOUT_ERROR_MESSAGE = "Unable to logout."
12 changes: 7 additions & 5 deletions conda_auth/handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# flake8: noqa: F401
from .base import AuthManager
from .oauth2 import (
OAuth2Manager,
OAuth2Handler,
manager as oauth2_manager,
)
from .basic_auth import (
BasicAuthManager,
BasicAuthHandler,
manager as basic_auth_manager,
HTTP_BASIC_AUTH_NAME,
)
from .token import (
TokenAuthManager,
TokenAuthHandler,
manager as token_auth_manager,
TOKEN_NAME,
)
67 changes: 18 additions & 49 deletions conda_auth/handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,24 @@

import conda.base.context
import keyring
import requests
from conda.gateways.connection.session import CondaSession
from conda.models.channel import Channel

from ..exceptions import InvalidCredentialsError

INVALID_CREDENTIALS_ERROR_MESSAGE = "Provided credentials are not correct."
from conda.base.context import context as global_context


class AuthManager(ABC):
"""
Defines an interface for auth handlers to use within plugin
"""

def __init__(self, context: conda.base.context.Context, cache: dict | None = None):
def __init__(
self,
context: conda.base.context.Context | None = None,
cache: dict | None = None,
):
"""
Optionally set a cache to use and configuration parameters to retrieve from
``conda.base.context.context.channel_settings``.
Optionally set a cache and context object to use
"""
self._context = context
self._context = context or global_context
self._cache = {} if cache is None else cache

def hook_action(self, command: str) -> None:
Expand All @@ -38,9 +36,9 @@ def hook_action(self, command: str) -> None:
channel.canonical_name in self._context.channels
and settings.get("auth") == self.get_auth_type()
):
self.authenticate(channel, settings)
self.store(channel, settings)

def authenticate(self, channel: Channel, settings: Mapping[str, str]) -> str:
def store(self, channel: Channel, settings: Mapping[str, str]) -> str:
"""
Used to retrieve credentials and store them on the ``cache`` property
Expand All @@ -52,7 +50,6 @@ def authenticate(self, channel: Channel, settings: Mapping[str, str]) -> str:
}
username, secret = self.fetch_secret(channel, extra_params)

verify_credentials(channel, self.get_auth_class())
self.save_credentials(channel, username, secret)

return username
Expand All @@ -64,9 +61,7 @@ def save_credentials(self, channel: Channel, username: str, secret: str) -> None
TODO: Method may be expanded in the future to allow the use of other storage
mechanisms.
"""
keyring.set_password(
self.get_keyring_id(channel.canonical_name), username, secret
)
keyring.set_password(self.get_keyring_id(channel), username, secret)

def fetch_secret(
self, channel: Channel, settings: Mapping[str, str | None]
Expand All @@ -93,14 +88,14 @@ def get_secret(self, channel_name: str) -> tuple[str | None, str | None]:

return secrets

def remove_channel_cache(self, channel_name: str) -> None:
def cache_clear(self, channel_name: str | None = None) -> None:
"""
Removes the cached secret for the given channel name
Remove the internal cache for the manager object
"""
try:
del self._cache[channel_name]
except KeyError:
pass
if channel_name:
self._cache.pop(channel_name, None)
else:
self._cache.clear()

@abstractmethod
def _fetch_secret(
Expand All @@ -127,7 +122,7 @@ def get_config_parameters(self) -> tuple[str, ...]:
"""

@abstractmethod
def get_keyring_id(self, channel_name: str) -> str:
def get_keyring_id(self, channel: Channel) -> str:
"""
Implementation should return the keyring id that will be used by the manager classes
"""
Expand All @@ -138,29 +133,3 @@ def get_auth_class(self) -> type:
Returns the authentication class to use (requests.auth.AuthBase subclass) for the given
authentication manager
"""


def verify_credentials(channel: Channel, auth_cls: type) -> None:
"""
Verify the credentials that have been currently set for the channel.
Raises exception if unable to make a successful request.
TODO:
We need a better way to tell if the credentials work. We might need
to fetch (or perform a HEAD request) on something specific like
repodata.json.
"""
for url in channel.base_urls:
session = CondaSession(auth=auth_cls(channel.canonical_name))
resp = session.head(url, allow_redirects=False)

try:
resp.raise_for_status()
except requests.exceptions.HTTPError as exc:
if exc.response.status_code == requests.codes["unauthorized"]:
error_message = INVALID_CREDENTIALS_ERROR_MESSAGE
else:
error_message = str(exc)

raise InvalidCredentialsError(error_message)
Loading

0 comments on commit e6d6ade

Please sign in to comment.