Skip to content

Commit

Permalink
feat/(token): token endpoint is now configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Divan009 committed Oct 1, 2024
1 parent 8509dbe commit ea3182c
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 13 deletions.
40 changes: 28 additions & 12 deletions config/clients/python/template/src/credentials.py.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,31 @@ class Credentials:
"""
self._configuration = value

def _parse_issuer(self, issuer: str):
default_endpoint_path = '/oauth/token'

parsed_url = urlparse(issuer.strip())

try:
parsed_url.port
except ValueError as e:
raise ApiValueError(e)

if parsed_url.netloc is None and parsed_url.path is None:
raise ApiValueError(f"Invalid issuer")

if parsed_url.scheme == "":
parsed_url = urlparse(f"https://{issuer}")
elif parsed_url.scheme not in ("http", "https"):
raise ApiValueError(f"Invalid issuer scheme {parsed_url.scheme} must be HTTP or HTTPS")

if parsed_url.path in ("", "/"):
parsed_url = parsed_url._replace(path=default_endpoint_path)

valid_url = urlunparse(parsed_url)

return valid_url

def validate_credentials_config(self):
"""
Check whether credentials configuration is valid
Expand All @@ -164,15 +189,6 @@ class Credentials:
if self.configuration is None or none_or_empty(self.configuration.client_id) or none_or_empty(self.configuration.client_secret) or none_or_empty(self.configuration.api_audience) or none_or_empty(self.configuration.api_issuer):
raise ApiValueError('configuration `{}` requires client_id, client_secret, api_audience and api_issuer defined for client_credentials method.')
# validate token issuer
combined_url = 'https://' + self.configuration.api_issuer
parsed_url = None
try:
parsed_url = urlparse(combined_url)
except ValueError:
raise ApiValueError(
f"api_issuer `{self.configuration.api_issuer}` is invalid"
)
if (parsed_url.netloc == ''):
raise ApiValueError(
f"api_issuer `{self.configuration.api_issuer}` is invalid"
)

parsed_issuer_url = self._parse_issuer(self.configuration.api_issuer)

58 changes: 57 additions & 1 deletion config/clients/python/template/test/credentials_test.py.mustache
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
{{>partial_header}}

from unittest import IsolatedAsyncioTestCase
import unittest

import {{packageName}}{{^asyncio}} as openfga_sdk{{/asyncio}}

from {{packageName}}.credentials import CredentialConfiguration, Credentials

from {{packageName}}.exceptions import ApiValueError

class TestCredentials(IsolatedAsyncioTestCase):
"""Credentials unit test"""
Expand Down Expand Up @@ -131,3 +132,58 @@ class TestCredentials(IsolatedAsyncioTestCase):
with self.assertRaises(openfga_sdk.ApiValueError):
credential.validate_credentials_config()


class TestCredentialsIssuer(unittest.TestCase):
def setUp(self):
# Setup a basic configuration that can be modified per test case
self.configuration = CredentialConfiguration(api_issuer="https://example.com")
self.credentials = Credentials(method="client_credentials", configuration=self.configuration)

def test_valid_issuer_https(self):
# Test a valid HTTPS URL
self.configuration.api_issuer = "issuer.fga.example "
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://issuer.fga.example/oauth/token")

def test_valid_issuer_with_oauth_endpoint_https(self):
# Test a valid HTTPS URL
self.configuration.api_issuer = "https://example.com/oauth/token"
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://example.com/oauth/token")

def test_valid_issuer_with_some_endpoint_https(self):
# Test a valid HTTPS URL
self.configuration.api_issuer = "https://example.com/oauth/some/endpoint"
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://example.com/oauth/some/endpoint")

def test_valid_issuer_http(self):
# Test a valid HTTP URL
self.configuration.api_issuer = "fga.example/some_endpoint"
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://fga.example/some_endpoint")

def test_invalid_issuer_no_scheme(self):
# Test an issuer URL without a scheme
self.configuration.api_issuer = "https://issuer.fga.example:8080/some_endpoint "
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://issuer.fga.example:8080/some_endpoint")

def test_invalid_issuer_bad_scheme(self):
# Test an issuer with an unsupported scheme
self.configuration.api_issuer = "ftp://example.com"
with self.assertRaises(ApiValueError):
self.credentials._parse_issuer(self.configuration.api_issuer)

def test_invalid_issuer_with_port(self):
# Test an issuer with an unsupported scheme
self.configuration.api_issuer = "https://issuer.fga.example:8080 "
result = self.credentials._parse_issuer(self.configuration.api_issuer)
self.assertEqual(result, "https://issuer.fga.example:8080/oauth/token")

# this should raise error
def test_invalid_issuer_bad_hostname(self):
# Test an issuer with an invalid hostname
self.configuration.api_issuer = "https://example?.com"
with self.assertRaises(ApiValueError):
self.credentials._parse_issuer(self.configuration.api_issuer)

0 comments on commit ea3182c

Please sign in to comment.