diff --git a/openstackinabox/models/keystone/model.py b/openstackinabox/models/keystone/model.py index 3ba255c..fc11df1 100644 --- a/openstackinabox/models/keystone/model.py +++ b/openstackinabox/models/keystone/model.py @@ -305,6 +305,30 @@ def bool_to_database(value): return 1 return 0 + def add_admin_tenant_details(self): + self.__admin_tenant_args = { + 'name': 'system', + 'description': 'system administrator', + } + + def add_admin_user_details(self): + self.__admin_user_args = { + 'tenantid': self.__admin_tenant_id, + 'username': 'system', + 'email': 'system@stackinabox', + 'password': 'stackinabox', + 'apikey': '537461636b496e41426f78', + } + + @property + def get_admin_tenant_details(self): + return self.get_tenant_by_id(self.__admin_tenant_id) + + @property + def get_admin_user_details(self): + return self.get_user_by_id(self.__admin_tenant_id, + self.__admin_user_id) + def init_database(self): self.log_info('Initializing database') dbcursor = self.database.cursor() @@ -314,11 +338,13 @@ def init_database(self): # Create an admin user and add the admin token to that user self.__admin_tenant_id = self.add_tenant('system', 'system administrator') + self.__admin_user_id = self.add_user(self.__admin_tenant_id, 'system', 'system@stackinabox', 'stackinabox', '537461636b496e41426f78') + roles = [ KeystoneModel.IDENTITY_ADMIN_ROLE, KeystoneModel.IDENTITY_VIEWER_ROLE, diff --git a/openstackinabox/services/keystone/v2.py b/openstackinabox/services/keystone/v2.py index ab79148..e54afe9 100644 --- a/openstackinabox/services/keystone/v2.py +++ b/openstackinabox/services/keystone/v2.py @@ -38,6 +38,10 @@ class KeystoneV2Service(BaseService): '^\/users\/{0}/OS-KSADM/credentials$' .format(USER_ID_REGEX)) + USER_ID_ADMIN_PATH_REGEX = re.compile( + '^\/users\/{0}/RAX-AUTH/admins$' + .format(USER_ID_REGEX)) + @staticmethod def get_user_id_from_path(uri_path): uri_matcher = None @@ -45,6 +49,7 @@ def get_user_id_from_path(uri_path): regexes = [ KeystoneV2Service.USER_ID_PATH_REGEX, KeystoneV2Service.USER_ID_KSADM_CREDENTIAL_PATH_REGEX, + KeystoneV2Service.USER_ID_ADMIN_PATH_REGEX, ] for r in regexes: @@ -81,6 +86,12 @@ def __init__(self): self.register(BaseService.POST, KeystoneV2Service.USER_ID_KSADM_CREDENTIAL_PATH_REGEX, KeystoneV2Service.handle_add_credentials_to_user) + self.register(BaseService.GET, + KeystoneV2Service.USER_ID_ADMIN_PATH_REGEX, + KeystoneV2Service.handle_get_admin_user) + self.register(BaseService.GET, + KeystoneV2Service.USER_ID_KSADM_CREDENTIAL_PATH_REGEX, + KeystoneV2Service.handle_get_user_credentials) self.log_info('initialized') @property @@ -579,3 +590,117 @@ def handle_add_credentials_to_user(self, request, uri, headers): return (503, headers, 'Server error') return (201, headers, '') + + def handle_get_admin_user(self, request, uri, headers): + ''' + 200 -> OK + 400 -> Bad Request + 403 -> Forbidden + 404 -> Not Found + 405 -> Invalid Method + 413 -> Over Limit + 415 -> Bad Media Type + 503 -> Service Fault + + No body + + Response + { + "users": [ + { + "RAX-AUTH:defaultRegion": , + "RAX-AUTH:domainId": , + "email": + "enabled": True/False, + "id": , + "username": + } + ] + } + ''' + self.log_request(uri, request) + req_headers = request.headers + + user_data = self.helper_authenticate(req_headers, + headers, + True, + False) + if isinstance(user_data, tuple): + return user_data + + try: + user_id = KeystoneV2Service.get_user_id_from_path(uri) + self.log_debug('Lookup of user id {0} requested' + .format(user_id)) + + except Exception as ex: # pragma: no cover + self.log_exception('Failed to get user id from path') + return (400, headers, 'bad request') + + try: + user_info = self.model.get_user_by_id(user_data['tenantid'], + user_id) + except: + self.log_exception('failed to get user data') + return (404, headers, 'Not found') + + del user_info['password'] + del user_info['apikey'] + user_info['RAX-AUTH:DomainID'] = user_info['tenantid'] + + return (200, headers, json.dumps(user_info)) + + def handle_get_user_credentials(self, request, uri, headers): + ''' + 200, 203 -> OK + 400 -> Bad Request + 401 -> Unauthorized + 403 -> Forbidden + 404 -> Not Found + 405 -> Invalid Method + 413 -> Over Limit + 503 -> Service Fault + + No body + + Response + { + "passwordCredentials": { + "username": , + "password": + } + } + ''' + self.log_request(uri, request) + req_headers = request.headers + + user_data = self.helper_authenticate(req_headers, + headers, + True, + False) + if isinstance(user_data, tuple): + return user_data + try: + user_id = KeystoneV2Service.get_user_id_from_path(uri) + self.log_debug('Lookup of user id {0} requested' + .format(user_id)) + + except Exception as ex: # pragma: no cover + self.log_exception('Failed to get user id from path') + return (400, headers, 'bad request') + + try: + user_info = self.model.get_user_by_id(user_data['tenantid'], + user_id) + except: + self.log_exception('failed to get user data') + return (404, headers, 'Not found') + + user_credentials = { + 'passwordCredentials': { + 'username': user_info['username'], + 'password': user_info['password'] + } + } + json_data = json.dumps(user_credentials) + return (200, headers, json_data) \ No newline at end of file diff --git a/openstackinabox/tests/test_get_admin_user.py b/openstackinabox/tests/test_get_admin_user.py new file mode 100644 index 0000000..a919571 --- /dev/null +++ b/openstackinabox/tests/test_get_admin_user.py @@ -0,0 +1,110 @@ +""" +Stack-In-A-Box: Basic Test +""" +import json +import unittest + +import httpretty +import mock +import requests +import stackinabox.util_httpretty +from stackinabox.stack import StackInABox + +from openstackinabox.models.keystone.model import KeystoneModel +from openstackinabox.services.keystone import KeystoneV2Service + + +@httpretty.activate +class TestKeystoneV2GetAdmin(unittest.TestCase): + + def setUp(self): + super(TestKeystoneV2GetAdmin, self).setUp() + self.keystone = KeystoneV2Service() + + self.headers = { + 'x-auth-token': self.keystone.model.get_admin_token() + } + self.tenant_id = self.keystone.model.add_tenant(tenantname='neo', + description='The One') + self.user_info = { + 'user': { + 'username': 'trinity', + 'enabled': True, + 'email': 'trinity@theone.matrix', + 'password': 'Inl0veWithNeo' + } + } + self.user_info['user']['userid'] =\ + self.keystone.model.add_user(tenantid=self.tenant_id, + username=self.user_info['user'][ + 'username'], + email=self.user_info['user']['email'], + password=self.user_info['user'][ + 'password'], + enabled=self.user_info['user'][ + 'enabled']) + self.keystone.model.add_token(self.tenant_id, + self.user_info['user']['userid']) + self.keystone.model.add_user_role_by_rolename( + tenantid=self.tenant_id, + userid=self.user_info['user']['userid'], + rolename=self.keystone.model.IDENTITY_ADMIN_ROLE) + StackInABox.register_service(self.keystone) + + def tearDown(self): + super(TestKeystoneV2GetAdmin, self).tearDown() + StackInABox.reset_services() + + @staticmethod + def get_userid_url(host, userid): + return 'http://{0}/keystone/v2.0/users/{1}/RAX-AUTH/admins'\ + .format(host, userid) + + def test_get_admin_user_basic(self): + stackinabox.util_httpretty.httpretty_registration('localhost') + + user_data = self.keystone.model.get_token_by_userid( + self.user_info['user']['userid']) + + url = TestKeystoneV2GetAdmin.get_userid_url( + 'localhost', + self.user_info['user']['userid']) + + self.headers['x-auth-token'] = user_data['token'] + res = requests.get(url, headers=self.headers, data='') + self.assertEqual(res.status_code, 200) + + def test_get_admin_user_incorrect_request(self): + stackinabox.util_httpretty.httpretty_registration('localhost') + + user_data = self.keystone.model.get_token_by_userid( + self.user_info['user']['userid']) + + url = TestKeystoneV2GetAdmin.get_userid_url( + 'localhost', + self.user_info['user']['userid']) + + res = requests.get(url, headers=self.headers, data='') + self.assertEqual(res.status_code, 404) + + def test_get_admin_user_no_token(self): + stackinabox.util_httpretty.httpretty_registration('localhost') + + url = TestKeystoneV2GetAdmin.get_userid_url( + 'localhost', + self.user_info['user']['userid']) + + res = requests.get(url, headers=None, data='') + self.assertEqual(res.status_code, 403) + + def test_get_admin_user_invalid_token(self): + stackinabox.util_httpretty.httpretty_registration('localhost') + + url = TestKeystoneV2GetAdmin.get_userid_url( + 'localhost', + self.user_info['user']['userid']) + self.headers['x-auth-token'] = 'new_token' + res = requests.get(url, headers=self.headers, data='') + self.assertEqual(res.status_code, 401) + + \ No newline at end of file diff --git a/openstackinabox/tests/test_keystone_v2_get_user_credentials.py b/openstackinabox/tests/test_keystone_v2_get_user_credentials.py new file mode 100644 index 0000000..8f601c0 --- /dev/null +++ b/openstackinabox/tests/test_keystone_v2_get_user_credentials.py @@ -0,0 +1,111 @@ +import unittest +import json + +import requests + +import stackinabox.util_requests_mock +from stackinabox.stack import StackInABox +from openstackinabox.models.keystone.model import KeystoneModel +from openstackinabox.services.keystone import KeystoneV2Service + + +class TestGetUserCredentials(unittest.TestCase): + + def setUp(self): + super(TestGetUserCredentials, self).setUp() + self.keystone = KeystoneV2Service() + + self.headers = { + 'x-auth-token': self.keystone.model.get_admin_token() + } + self.tenant_id = self.keystone.model.add_tenant(tenantname='neo', + description='The One') + self.user_info = { + 'user': { + 'username': 'trinity', + 'enabled': True, + 'email': 'trinity@theone.matrix', + 'password': 'Inl0veWithNeo' + } + } + self.user_info['user']['userid'] =\ + self.keystone.model.add_user(tenantid=self.tenant_id, + username=self.user_info['user'][ + 'username'], + email=self.user_info['user']['email'], + password=self.user_info['user'][ + 'password'], + enabled=self.user_info['user'][ + 'enabled']) + self.keystone.model.add_token(self.tenant_id, + self.user_info['user']['userid']) + self.keystone.model.add_user_role_by_rolename( + tenantid=self.tenant_id, + userid=self.user_info['user']['userid'], + rolename=self.keystone.model.IDENTITY_ADMIN_ROLE) + StackInABox.register_service(self.keystone) + self.session = requests.Session() + + def tearDown(self): + super(TestGetUserCredentials, self).tearDown() + StackInABox.reset_services() + self.session.close() + + @staticmethod + def get_userid_url(host, userid): + return 'http://{0}/keystone/v2.0/users/{1}/OS-KSADM/credentials'\ + .format(host, userid) + + def test_get_user_credentials_basic(self): + with stackinabox.util_requests_mock.activate(): + stackinabox.util_requests_mock.requests_mock_registration( + 'localhost') + user_data = self.keystone.model.get_token_by_userid( + self.user_info['user']['userid']) + + url = TestGetUserCredentials.get_userid_url( + 'localhost', + self.user_info['user']['userid']) + + self.headers['x-auth-token'] = user_data['token'] + res = requests.get(url, headers=self.headers, data='') + self.assertEqual(res.status_code, 200) + + def test_get_user_credentials_incorrect_request(self): + with stackinabox.util_requests_mock.activate(): + stackinabox.util_requests_mock.requests_mock_registration( + 'localhost') + + user_data = self.keystone.model.get_token_by_userid( + self.user_info['user']['userid']) + + url = TestGetUserCredentials.get_userid_url( + 'localhost', + self.user_info['user']['userid']) + + res = requests.get(url, headers=self.headers, data='') + self.assertEqual(res.status_code, 404) + + def test_get_user_credentials_no_token(self): + with stackinabox.util_requests_mock.activate(): + stackinabox.util_requests_mock.requests_mock_registration( + 'localhost') + + url = TestGetUserCredentials.get_userid_url( + 'localhost', + self.user_info['user']['userid']) + + res = requests.get(url, headers=None, data='') + self.assertEqual(res.status_code, 403) + + def test_get_user_credentials_invalid_token(self): + with stackinabox.util_requests_mock.activate(): + stackinabox.util_requests_mock.requests_mock_registration( + 'localhost') + + url = TestGetUserCredentials.get_userid_url( + 'localhost', + self.user_info['user']['userid']) + self.headers['x-auth-token'] = 'new_token' + res = requests.get(url, headers=self.headers, data='') + self.assertEqual(res.status_code, 401) diff --git a/openstackinabox/tests/test_model_basic.py b/openstackinabox/tests/test_model_basic.py new file mode 100644 index 0000000..f447ae4 --- /dev/null +++ b/openstackinabox/tests/test_model_basic.py @@ -0,0 +1,48 @@ +""" +Stack-In-A-Box: Basic Test +""" +import json +import unittest + +import httpretty +import mock +import requests +import stackinabox.util_httpretty +from stackinabox.stack import StackInABox + +from openstackinabox.models.keystone.model import KeystoneModel +from openstackinabox.services.keystone import KeystoneV2Service + + +@httpretty.activate +class TestKeystoneModel(unittest.TestCase): + + def setUp(self): + super(TestKeystoneModel, self).setUp() + self.keystone = KeystoneV2Service() + self.headers = { + 'x-auth-token': self.keystone.model.get_admin_token() + } + + def tearDown(self): + super(TestKeystoneModel, self).tearDown() + StackInABox.reset_services() + + def test_keystone_set_model(self): + with self.assertRaises(TypeError): + self.keystone.model = None + + self.keystone.model = KeystoneModel() + + def test_get_tenant_details(self): + stackinabox.util_httpretty.httpretty_registration('localhost') + tenant_details = self.keystone.model.get_admin_tenant_details + self.assertEqual(tenant_details['name'], 'system') + self.assertEqual(tenant_details['description'], 'system administrator') + + def test_get_user_details(self): + user_details = self.keystone.model.get_admin_user_details + self.assertEqual(user_details['username'], 'system') + self.assertEqual(user_details['email'], 'system@stackinabox') + self.assertEqual(user_details['password'], 'stackinabox') + self.assertEqual(user_details['apikey'], '537461636b496e41426f78')