Skip to content

Commit

Permalink
Merge pull request #210 from jaro221/2.0-alpha-tests
Browse files Browse the repository at this point in the history
2.0-alpha-tests
  • Loading branch information
tdviet authored Jan 21, 2025
2 parents 9b261f8 + 320117a commit 682c61a
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 13 deletions.
50 changes: 50 additions & 0 deletions fedcloudclient/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import jwt
import liboidcagent as agent
import requests
import os
import re

from fedcloudclient.conf import CONF as CONF
from fedcloudclient.exception import TokenError
from fedcloudclient.logger import log_and_raise



class Token:
"""
Abstract object for managing tokens
Expand All @@ -23,6 +26,7 @@ def get_token_type(self):
...



class OIDCToken(Token):
"""
OIDC tokens. Managing access tokens, oidc-agent account and mytoken
Expand All @@ -35,6 +39,7 @@ def __init__(self, access_token=None):
self.oidc_agent_account = None
self.mytoken = None
self.user_id = None
self._VO_PATTERN = "urn:mace:egi.eu:group:(.+?):(.+:)*role=member#aai.egi.eu"

def get_token(self):
"""
Expand Down Expand Up @@ -67,6 +72,7 @@ def get_user_id(self) -> str:
Return use ID
:return:
"""

if not self.payload:
self.decode_token()
return self.user_id
Expand All @@ -87,6 +93,8 @@ def get_token_from_oidc_agent(self, oidc_agent_account: str) -> str:
)
self.access_token = access_token
self.oidc_agent_account = oidc_agent_account


return access_token
except agent.OidcAgentError as exception:
error_msg = f"Error getting access token from oidc-agent: {exception}"
Expand Down Expand Up @@ -140,6 +148,9 @@ def multiple_token(self, access_token: str, oidc_agent_account: str, mytoken: st
"""
if mytoken:
try:

"""need to implement from mytoken and check"""

self.get_token_from_mytoken(mytoken)
return
except TokenError:
Expand All @@ -154,3 +165,42 @@ def multiple_token(self, access_token: str, oidc_agent_account: str, mytoken: st
self.access_token = access_token
return
log_and_raise("Cannot get access token", TokenError)

def oidc_discover(self) -> dict:
"""
:param oidc_url: CheckIn URL get from payload
:return: JSON object of OIDC configuration
"""
oidc_url=self.payload["iss"]
request = requests.get(oidc_url + "/.well-known/openid-configuration")
request.raise_for_status()
self.request_json=request.json()
return self.request_json

def token_list_vos(self):
"""
List VO memberships in EGI Check-in
:return: list of VO names
"""

oidc_ep = self.request_json
z_user_info=oidc_ep["userinfo_endpoint"]
z_head={"Authorization": f"Bearer {self.access_token}"}

request = requests.get(
oidc_ep["userinfo_endpoint"],
headers={"Authorization": f"Bearer {self.access_token}"},
)

request.raise_for_status()
vos = set()
pattern = re.compile(self._VO_PATTERN)
for claim in request.json().get("eduperson_entitlement", []):
vo = pattern.match(claim)
if vo:
vos.add(vo.groups()[0])
request.raise_for_status()

return sorted(vos)


96 changes: 88 additions & 8 deletions fedcloudclient/auth_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,103 @@
Testing unit for auth.py
"""
import os
from colorama import init as colorama_init
from colorama import Fore
from colorama import Style

import fedcloudclient.auth as auth
from fedcloudclient.conf import CONF as CONF

VO_PATTERN = "urn:mace:egi.eu:group:(.+?):(.+:)*role=member#aai.egi.eu"

def get_token_from_mytoken_decode_verify(mytoken: str, user_id: str):
def verify_MYTOKEN(mytoken: str) -> str:
"""
Get access token from mytoken server, decode, get user ID and verify
:return:
"""

token = auth.OIDCToken()
token.get_token_from_mytoken(mytoken)
token_id = token.get_user_id()
assert token_id == user_id
try:
access_token_mytoken=token.get_token_from_mytoken(mytoken, None)
return access_token_mytoken
except:
return print(f"No MYTOKEN")


def verify_OIDC_AGENT(user_id:str) -> str:
token = auth.OIDCToken()
try:
access_token_oidc=token.get_token_from_oidc_agent(user_id)
return access_token_oidc
except:
return print(f"No OIDC_AGENT_ACCOUNT")



def verify_ACCESS_TOKEN(access_token:str) -> str:
token = auth.OIDCToken()
try:
token.access_token=access_token
return token.access_token
except:
return print(f"Error with ACCESS_TOKEN")

def verify_user_id(access_token:str) -> str:
token = auth.OIDCToken()
token.access_token=access_token
try:
user_id=token.get_user_id()
return user_id
except:
print("No user_id!")

def verify_pyload(access_token:str) -> dict:
token = auth.OIDCToken()
token.access_token=access_token
#try:
user_id=token.get_user_id()
payload=token.payload
request_json=token.oidc_discover()
list_vos=token.token_list_vos()
return payload,request_json,list_vos
#except:
# print("No user_id!")


def printing_dict(var_dict:dict):
for idx, item in enumerate(var_dict):
print(f"{item}:\t {var_dict[item]}")


if __name__ == "__main__":
mytoken = os.environ["FEDCLOUD_MYTOKEN"]
user_id = os.environ["FEDCLOUD_ID"]
get_token_from_mytoken_decode_verify(mytoken, user_id)
print(f"Start of verifying auth.py")

access_token= os.environ.get("ACCESS_TOKEN","")
access_token_check=verify_ACCESS_TOKEN(access_token)

mytoken=os.environ.get("FEDCLOUD_MYTOKEN","")
access_token_mytok=verify_MYTOKEN(mytoken)

oidc_agent_name=os.environ.get("OIDC_AGENT_ACCOUNT","")
access_token_oidc=verify_OIDC_AGENT(oidc_agent_name)

user_id=verify_user_id(access_token_oidc)
payload,request_json,list_vos=verify_pyload(access_token_oidc)


print(f"{type(payload)}")
printing_dict(payload)
print("-------------------------------------------------")
printing_dict(request_json)
print("-------------------------------------------------")
print(list_vos)
print(f"Break")










9 changes: 6 additions & 3 deletions fedcloudclient/checkin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def print_error(message, quiet):
print(message, file=sys.stderr)


""" Included in auth.py, line 50"""
def decode_token(oidc_access_token):
"""
Decoding access token to a dict
Expand All @@ -44,6 +45,7 @@ def decode_token(oidc_access_token):
return payload



def oidc_discover(oidc_url):
"""
Discover OIDC endpoints
Expand All @@ -57,6 +59,7 @@ def oidc_discover(oidc_url):
return request.json()


""" Included in auth.py, line 74"""
def get_token_from_oidc_agent(oidc_agent_account, quiet=False):
"""
Get access token from oidc-agent
Expand All @@ -82,6 +85,7 @@ def get_token_from_oidc_agent(oidc_agent_account, quiet=False):
return None


""" Included in auth.py, line 99"""
def get_token_from_mytoken_server(mytoken, mytoken_server, quiet=False):
"""
Get access token from mytoken server
Expand Down Expand Up @@ -149,7 +153,7 @@ def check_token(oidc_token, verbose=False):

def get_checkin_id(
oidc_token,
):
):
"""
Get EGI Check-in ID from access token
Expand All @@ -167,8 +171,7 @@ def get_access_token(
oidc_access_token,
oidc_agent_account,
mytoken,
mytoken_server,
):
mytoken_server,):
"""
Get access token
Generates new access token from oidc-agent
Expand Down
4 changes: 3 additions & 1 deletion fedcloudclient/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import yaml
from tabulate import tabulate

from fedcloudclient.exception import ConfigError
#from fedcloudclient.exception import ConfigError

DEFAULT_CONFIG_LOCATION = Path.home() / ".config/fedcloud/config.yaml"
DEFAULT_SETTINGS = {
Expand Down Expand Up @@ -137,13 +137,15 @@ def create(config_file: str):
envvar="FEDCLOUD_CONFIG_FILE",
show_default=True,
)

@click.option(
"--output-format",
"-f",
required=False,
help="Output format",
type=click.Choice(["text", "YAML", "JSON"], case_sensitive=False),
)

def show(config_file: str, output_format: str):
"""Show actual client configuration """
saved_config = load_config(config_file)
Expand Down
13 changes: 12 additions & 1 deletion fedcloudclient/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get_shell_type():

return Shell.LINUX


""" Imported to the sites """
def print_set_env_command(name, value):
"""
Print command to set environment variable,
Expand All @@ -62,3 +62,14 @@ def print_comment(comment):
print(f"# {comment!s}")
else:
print(f"rem {comment!s}")


out_1=Shell(1)

print(type(out_1))
print(Shell.LINUX)

print(print_comment({"gewgweg": False}))
print(f"Done")


0 comments on commit 682c61a

Please sign in to comment.