diff --git a/cloudify/cluster.py b/cloudify/cluster.py deleted file mode 100644 index 200f84091..000000000 --- a/cloudify/cluster.py +++ /dev/null @@ -1,106 +0,0 @@ -######## -# Copyright (c) 2017-2019 Cloudify Platform Ltd. All rights reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. - -import re -import types -import random -import requests -import itertools - -from cloudify.utils import ipv6_url_compat - -from cloudify_rest_client import CloudifyClient -from cloudify_rest_client.client import HTTPClient -from cloudify_rest_client.exceptions import CloudifyClientError - - -class ClusterHTTPClient(HTTPClient): - - def __init__(self, host, *args, **kwargs): - # from outside, we get host passed in as a list (optionally). - # But we still need self.host to be the currently-used manager, - # and we can store the list as self.hosts - # (copy the list so that outside mutations don't affect us) - hosts = list(host) if isinstance(host, list) else [host] - hosts = [ipv6_url_compat(h) for h in hosts] - random.shuffle(hosts) - self.hosts = itertools.cycle(hosts) - super(ClusterHTTPClient, self).__init__(hosts[0], *args, **kwargs) - self.default_timeout_sec = self.default_timeout_sec or (5, None) - self.retries = 30 - self.retry_interval = 3 - - def do_request(self, method, url, *args, **kwargs): - kwargs.setdefault('timeout', self.default_timeout_sec) - - copied_data = None - if isinstance(kwargs.get('data'), types.GeneratorType): - copied_data = itertools.tee(kwargs.pop('data'), self.retries) - - errors = {} - for retry in range(self.retries): - manager_to_try = next(self.hosts) - self.host = manager_to_try - if copied_data is not None: - kwargs['data'] = copied_data[retry] - - try: - return super(ClusterHTTPClient, self).do_request( - method, url, *args, **kwargs) - except (requests.exceptions.ConnectionError) as error: - self.logger.debug( - 'Connection error when trying to connect to ' - 'manager {0}'.format(error) - ) - errors[manager_to_try] = error - continue - except CloudifyClientError as e: - errors[manager_to_try] = e.status_code - if e.response.status_code == 502: - continue - if e.response.status_code == 404 and \ - self._is_fileserver_download(e.response): - continue - else: - raise - - raise CloudifyClientError( - 'HTTP Client error: {0} {1} ({2})'.format( - method.__name__.upper(), - url, - ', '.join( - '{0}: {1}'.format(host, e) for host, e in errors.items() - ) - )) - - def _is_fileserver_download(self, response): - """Is this response a file-download response? - - 404 responses to requests that download files, need to be retried - with all managers in the cluster: if some file was not yet - replicated, another manager might have this file. - - This is because the file replication is asynchronous. - """ - if re.search('/(blueprints|snapshots)/', response.url): - return True - disposition = response.headers.get('Content-Disposition') - if not disposition: - return False - return disposition.strip().startswith('attachment') - - -class CloudifyClusterClient(CloudifyClient): - client_class = ClusterHTTPClient diff --git a/cloudify/manager.py b/cloudify/manager.py index bf158354f..7d64929ca 100644 --- a/cloudify/manager.py +++ b/cloudify/manager.py @@ -8,7 +8,8 @@ from cloudify.state import ctx, workflow_ctx, NotInContext from cloudify.exceptions import (HttpException, NonRecoverableError) -from cloudify.cluster import CloudifyClusterClient +from cloudify_rest_client.client import CloudifyClient +from cloudify_async_client.client import AsyncCloudifyClient class NodeInstance(object): @@ -123,7 +124,7 @@ def system_properties(self): return self._system_properties -def get_rest_client(tenant=None, api_token=None): +def get_rest_client(tenant=None, api_token=None, async_client=False): """ :param tenant: optional tenant name to connect as :param api_token: optional api_token to authenticate with (instead of @@ -150,7 +151,11 @@ def get_rest_client(tenant=None, api_token=None): else: token = utils.get_rest_token() - return CloudifyClusterClient( + client_cls = CloudifyClient + if async_client: + client_cls = AsyncCloudifyClient + + return client_cls( headers=headers, host=utils.get_manager_rest_service_host(), port=utils.get_manager_rest_service_port(), @@ -159,7 +164,8 @@ def get_rest_client(tenant=None, api_token=None): protocol=utils.get_manager_rest_service_protocol(), cert=utils.get_local_rest_certificate(), kerberos_env=utils.get_kerberos_indication( - os.environ.get(constants.KERBEROS_ENV_KEY)) + os.environ.get(constants.KERBEROS_ENV_KEY)), + retries=30, ) diff --git a/cloudify_async_client/__init__.py b/cloudify_async_client/__init__.py index 12b968e40..c49e0eaa0 100644 --- a/cloudify_async_client/__init__.py +++ b/cloudify_async_client/__init__.py @@ -1 +1,3 @@ -from cloudify_async_client.client import CloudifyAsyncClient # noqa +from cloudify_async_client.client import AsyncCloudifyClient + +__all__ = ['AsyncCloudifyClient'] diff --git a/cloudify_async_client/audit_log.py b/cloudify_async_client/audit_log.py index 3ec5336f1..da1c661ca 100644 --- a/cloudify_async_client/audit_log.py +++ b/cloudify_async_client/audit_log.py @@ -1,4 +1,3 @@ -from cloudify_async_client import CloudifyAsyncClient from cloudify_rest_client.audit_log import AuditLogClient @@ -14,20 +13,10 @@ async def stream(self, timeout=None, **kwargs): :return: ``ListResponse`` with of ``AuditLog`` items and response metadata. """ - client = await self.async_client() - response = await client.get('audit/stream', - params=kwargs, - timeout=timeout) - return response - - async def async_client(self): - headers = self.api.headers.copy() - headers.update({'Content-type': 'text/event-stream'}) - client = CloudifyAsyncClient( - host=self.api.host, - port=self.api.port, - protocol=self.api.protocol, - cert=self.api.cert, - headers=headers, + response = await self.api.get( + '/audit/stream', + params=kwargs, + timeout=timeout, + stream=True, ) - return client + return response diff --git a/cloudify_async_client/client.py b/cloudify_async_client/client.py index 03a293b1c..114675916 100644 --- a/cloudify_async_client/client.py +++ b/cloudify_async_client/client.py @@ -1,44 +1,183 @@ +import json +import itertools +import logging +import numbers import ssl +import types +import aiohttp.client_exceptions -class CloudifyAsyncClient: - host: str - port: int - protocol: str - headers: dict - cert: str - - def __init__(self, **kwargs): - # only import aiohttp if this is used - otherwise we pay the price - # on every import of the rest-client - import aiohttp - self._aiohttp = aiohttp - - self.host = kwargs.pop('host', 'localhost') - self.port = kwargs.pop('port', 443) - self.protocol = kwargs.pop('protocol', 'https') - self.headers = kwargs.pop('headers', {}) - self.cert = kwargs.pop('cert') - self.ssl = ssl.create_default_context(cafile=self.cert) - self.api_version = 'v3.1' - self.session = self._aiohttp.ClientSession(headers=self.headers) +from cloudify_rest_client import exceptions +from cloudify_rest_client.client import HTTPClientBase, CloudifyClient + +from cloudify_async_client.audit_log import AuditLogAsyncClient + + +class AsyncHTTPClient(HTTPClientBase): + def __init__(self, *args, **kwargs): + session = kwargs.pop('session', None) + timeout = kwargs.pop('timeout', None) + super().__init__(*args, **kwargs) + # can't use base class' timeout because it's a tuple there, and + # aiohttp needs a single int + self.default_timeout_sec = timeout or 5 + + self._aiohttp = None + if session is None: + session = self.aiohttp.ClientSession(headers=self.headers) + self._session = session + + if self.trust_all: + self.ssl = ssl.create_default_context() + self.ssl.check_hostname = False + self.ssl.verify_mode = ssl.CERT_NONE + else: + self.ssl = ssl.create_default_context(cafile=self.cert) @property - def url(self): - return '{0}://{1}:{2}/api/{3}'.format(self.protocol, self.host, - self.port, self.api_version) - - def get(self, url, params=None, timeout=300, **kwargs): - if isinstance(timeout, int) or isinstance(timeout, float): - timeout = self._aiohttp.ClientTimeout(total=timeout) - - if params: - # Format query parameters and pass params only if it is not empty - p = {k: str(v) for k, v in params.items() if v is not None} - if p: - kwargs['params'] = p - - return self.session.get(f"{self.url}/{url}", - ssl=self.ssl, - timeout=timeout, - **kwargs) + def aiohttp(self): + if self._aiohttp is None: + import aiohttp + self._aiohttp = aiohttp + return self._aiohttp + + async def do_request( + self, + method, + uri, + data, + params, + headers, + expected_status_code, + stream, + verify, + timeout, + wrapper, + versioned_url=True, + ): + session_method = getattr(self._session, method.lower(), None) + if session_method is None: + raise RuntimeError(f'Unknown method: {method}') + + copied_data = None + if isinstance(data, types.GeneratorType): + copied_data = itertools.tee(data, self.retries) + elif isinstance(data, dict): + data = json.dumps(data) + + errors = {} + for retry in range(self.retries): + manager_to_try = self.get_host() + request_url = self.get_request_url( + manager_to_try, + uri, + versioned=versioned_url, + ) + if copied_data is not None: + data = copied_data[retry] + try: + response = await session_method( + request_url, + data=data, + params=params, + headers=headers, + ssl=self.ssl, + timeout=timeout or self.default_timeout_sec, + auth=self.auth, + ) + return await self.process_response( + response, + expected_status_code, + stream, + wrapper, + ) + + except aiohttp.client_exceptions.ClientSSLError as e: + errors[manager_to_try] = exceptions.format_ssl_error(e) + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + continue + except aiohttp.client_exceptions.ClientConnectionError as e: + errors[manager_to_try] = exceptions.format_connection_error(e) + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + continue + except exceptions.CloudifyClientError as e: + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + errors[manager_to_try] = e.status_code + if e.status_code == 502: + continue + if e.status_code == 404 and \ + self._is_fileserver_download(e.response): + continue + else: + raise + + mgr_errors = ', '.join(f'{host}: {e}' for host, e in errors.items()) + raise exceptions.CloudifyClientError( + f'HTTP Client error: {method} {uri} ({mgr_errors})') + + async def process_response( + self, + response, + expected_status_code, + stream, + wrapper + ): + if self.logger.isEnabledFor(logging.DEBUG): + for hdr, hdr_content in response.request_info.headers.items(): + self.logger.debug('request header: %s: %s', hdr, hdr_content) + self.logger.debug('reply: "%s %s" %s', response.status, + response.reason, response.content) + for hdr, hdr_content in response.headers.items(): + self.logger.debug('response header: %s: %s', hdr, hdr_content) + + if isinstance(expected_status_code, numbers.Number): + expected_status_code = [expected_status_code] + if response.status not in expected_status_code: + raise exceptions.CloudifyClientError.from_response( + response, response.status, response.request_info.url) + + if response.status == 204: + return None + + if stream: + return response + + response_json = await response.json() + + if response.history: + response_json['history'] = response.history + + if wrapper: + return wrapper(response_json) + await response.close() + + return response_json + + async def close(self): + await self._session.close() + + +class AsyncCloudifyClient(CloudifyClient): + client_class = AsyncHTTPClient + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.auditlog = AuditLogAsyncClient(self._client) + + @property + def community_contacts(self): + raise RuntimeError('async client does not support community_contacts') + + @property + def deployment_groups(self): + raise RuntimeError('async client does not support deployment_groups') + + @property + def log_bundles(self): + raise RuntimeError('async client does not support log_bundles') + + def close(self): + return self._client.close() diff --git a/cloudify_rest_client/agents.py b/cloudify_rest_client/agents.py index d7832b5fa..af3c1b5b9 100644 --- a/cloudify_rest_client/agents.py +++ b/cloudify_rest_client/agents.py @@ -140,7 +140,6 @@ class AgentsClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'agents' - self._wrapper_cls = Agent def list(self, _include=None, sort=None, is_descending=False, **kwargs): """List the agents installed from the manager. @@ -154,12 +153,11 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: kwargs['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - _include=_include, - params=kwargs) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + _include=_include, + params=kwargs, + wrapper=ListResponse.of(Agent), ) def get(self, name): @@ -168,8 +166,10 @@ def get(self, name): :param name: The name of the agent :return: The details of the agent """ - response = self.api.get('/{0}/{1}'.format(self._uri_prefix, name)) - return self._wrapper_cls(response) + return self.api.get( + '/{0}/{1}'.format(self._uri_prefix, name), + wrapper=Agent, + ) def create(self, name, node_instance_id, state=AgentState.CREATING, create_rabbitmq_user=True, **kwargs): @@ -185,9 +185,11 @@ def create(self, name, node_instance_id, state=AgentState.CREATING, 'state': state, 'create_rabbitmq_user': create_rabbitmq_user} data.update(kwargs) - response = self.api.put('/{0}/{1}'.format(self._uri_prefix, name), - data=data) - return self._wrapper_cls(response) + return self.api.put( + '/{0}/{1}'.format(self._uri_prefix, name), + data=data, + wrapper=Agent, + ) def update(self, name, state): """Update agent with the provided state. @@ -197,9 +199,11 @@ def update(self, name, state): :return: The updated agent """ data = {'state': state} - response = self.api.patch('/{0}/{1}'.format(self._uri_prefix, name), - data=data) - return self._wrapper_cls(response) + return self.api.patch( + '/{0}/{1}'.format(self._uri_prefix, name), + data=data, + wrapper=Agent, + ) def replace_ca_certs(self, bundle, @@ -227,6 +231,7 @@ def replace_ca_certs(self, 'manager_ca_cert': manager_ca_cert_str } - response = self.api.patch('/' + self._uri_prefix, data=data) - - return response + return self.api.patch( + '/' + self._uri_prefix, + data=data, + ) diff --git a/cloudify_rest_client/audit_log.py b/cloudify_rest_client/audit_log.py index f592efdf9..51aa4a14b 100644 --- a/cloudify_rest_client/audit_log.py +++ b/cloudify_rest_client/audit_log.py @@ -68,11 +68,11 @@ def list(self, get_all=False, **kwargs): if get_all: params['size'] = 0 params['offset'] = 0 - response = self.api.get('/audit', params=params) - - return ListResponse( - [AuditLog(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/audit', + params=params, + wrapper=ListResponse.of(AuditLog), + ) def delete(self, **kwargs): """Delete (some) of the AuditLogs. @@ -82,12 +82,15 @@ def delete(self, **kwargs): :return: DeletedResponse describing deletion outcome - a number of 'deleted' records. """ - response = self.api.delete('/audit', params=kwargs) - return DeletedResponse(**response) + return self.api.delete( + '/audit', + params=kwargs, + wrapper=DeletedResponse, + ) def inject(self, logs): """Inject audit logs. Intended for internal use only. :param logs: List of dict log entries to inject. """ - self.api.post('/audit', data=logs) + return self.api.post('/audit', data=logs) diff --git a/cloudify_rest_client/blueprints.py b/cloudify_rest_client/blueprints.py index 314d06342..4955349ee 100644 --- a/cloudify_rest_client/blueprints.py +++ b/cloudify_rest_client/blueprints.py @@ -118,7 +118,6 @@ class BlueprintsClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'blueprints' - self._wrapper_cls = Blueprint def _prepare_put_request( self, @@ -254,6 +253,7 @@ def callback_wrapper(watcher): data=multipart, headers={'Content-Type': multipart.content_type}, expected_status_code=expected_status, + wrapper=Blueprint, ) def _validate(self, @@ -330,18 +330,25 @@ def list(self, _include=None, sort=None, is_descending=False, params['_filter_id'] = filter_id if filter_rules: - response = self.api.post('/searches/blueprints', params=params, - data={'filter_rules': filter_rules}) + return self.api.post( + '/searches/blueprints', + params=params, + data={'filter_rules': filter_rules}, + wrapper=ListResponse.of(Blueprint), + ) elif constraints: - response = self.api.post('/searches/blueprints', params=params, - data={'constraints': constraints}) + return self.api.post( + '/searches/blueprints', + params=params, + data={'constraints': constraints}, + wrapper=ListResponse.of(Blueprint), + ) else: - response = self.api.get('/{self._uri_prefix}'.format(self=self), - params=params) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + params=params, + wrapper=ListResponse.of(Blueprint), + ) def publish_archive( self, @@ -378,7 +385,7 @@ def publish_archive( blueprint's unique Id. """ - response = self._upload( + return self._upload( archive_location, blueprint_id=blueprint_id, application_file_name=blueprint_filename, @@ -391,8 +398,6 @@ def publish_archive( skip_execution=skip_execution, requirements=requirements, ) - if not async_upload: - return self._wrapper_cls(response) @staticmethod def calc_size(blueprint_path): @@ -448,7 +453,7 @@ def upload( tar_path, application_file = self._validate_blueprint_size( path, tempdir, skip_size_limit) - response = self._upload( + return self._upload( tar_path, blueprint_id=entity_id, application_file_name=application_file, @@ -463,8 +468,6 @@ def upload( legacy=legacy, requirements=requirements, ) - if not async_upload: - return self._wrapper_cls(response) finally: shutil.rmtree(tempdir) @@ -501,7 +504,7 @@ def validate(self, tar_path, application_file = self._validate_blueprint_size( path, tempdir, skip_size_limit) - response = self._validate( + return self._validate( tar_path or path, blueprint_id=entity_id, application_file_name=application_file or blueprint_filename, @@ -510,10 +513,6 @@ def validate(self, finally: shutil.rmtree(tempdir) - if response: - # on cloudify earlier than 6.4, response is None (204 no content) - return response - def get(self, blueprint_id, _include=None): """ Gets a blueprint by its id. @@ -524,8 +523,11 @@ def get(self, blueprint_id, _include=None): """ assert blueprint_id uri = '/{self._uri_prefix}/{id}'.format(self=self, id=blueprint_id) - response = self.api.get(uri, _include=_include) - return self._wrapper_cls(response) + return self.api.get( + uri, + _include=_include, + wrapper=Blueprint, + ) def delete(self, blueprint_id, force=False): """ @@ -538,7 +540,7 @@ def delete(self, blueprint_id, force=False): """ assert blueprint_id - self.api.delete( + return self.api.delete( '/{self._uri_prefix}/{id}'.format(self=self, id=blueprint_id), params={'force': force}) @@ -607,13 +609,12 @@ def update(self, blueprint_id, update_dict): :param update_dict: Dictionary of attributes and values to be updated. :return: The updated blueprint. """ - response = self.api.patch('/{self._uri_prefix}/{id}'.format( + return self.api.patch('/{self._uri_prefix}/{id}'.format( self=self, id=blueprint_id), - data=update_dict + data=update_dict, + wrapper=Blueprint, ) - return self._wrapper_cls(response) - def upload_archive(self, blueprint_id, archive_path): """ Upload an archive for an existing a blueprint. @@ -630,7 +631,7 @@ def upload_archive(self, blueprint_id, archive_path): archive_data = bytes_stream_utils.request_data_file_stream( archive_path, client=self.api) - self.api.put('/{self._uri_prefix}/{id}/archive'.format( + return self.api.put('/{self._uri_prefix}/{id}/archive'.format( self=self, id=blueprint_id), data=archive_data ) @@ -646,7 +647,7 @@ def upload_icon(self, blueprint_id, icon_path): icon_data = bytes_stream_utils.request_data_file_stream( icon_path, client=self.api) - self.api.patch('/{self._uri_prefix}/{id}/icon'.format( + return self.api.patch('/{self._uri_prefix}/{id}/icon'.format( self=self, id=blueprint_id), data=icon_data ) @@ -657,6 +658,6 @@ def remove_icon(self, blueprint_id): :param blueprint_id: Blueprint's id to update. """ - self.api.patch('/{self._uri_prefix}/{id}/icon'.format( + return self.api.patch('/{self._uri_prefix}/{id}/icon'.format( self=self, id=blueprint_id), ) diff --git a/cloudify_rest_client/bytes_stream_utils.py b/cloudify_rest_client/bytes_stream_utils.py index 6eaeafe85..822da7ca8 100644 --- a/cloudify_rest_client/bytes_stream_utils.py +++ b/cloudify_rest_client/bytes_stream_utils.py @@ -15,7 +15,7 @@ def request_data_file_stream(file_path, :param progress_callback: Callback function - can be used to print progress :return: File data or generator object """ - if client and client.has_kerberos() and not client.has_auth_header(): + if client and client.has_kerberos and not client.has_auth_header(): # kerberos currently does not support chunks with open(file_path, 'rb') as f: data = f.read() diff --git a/cloudify_rest_client/client.py b/cloudify_rest_client/client.py index 368d4add2..9a5ba3953 100644 --- a/cloudify_rest_client/client.py +++ b/cloudify_rest_client/client.py @@ -1,6 +1,10 @@ +import itertools import json import logging import numbers +import random +import re +import types import requests from base64 import b64encode @@ -58,7 +62,6 @@ from cloudify_rest_client.workflows import WorkflowsClient from cloudify_rest_client.audit_log import AuditLogClient from cloudify_rest_client.community_contacts import CommunityContactsClient -from cloudify_async_client.audit_log import AuditLogAsyncClient try: from requests_kerberos import HTTPKerberosAuth @@ -80,15 +83,32 @@ urllib3.disable_warnings(urllib3.exceptions.InsecurePlatformWarning) -class HTTPClient(object): +class HTTPClientBase: + def __init__( + self, + host, + port=DEFAULT_PORT, + protocol=DEFAULT_PROTOCOL, + api_version=DEFAULT_API_VERSION, + headers=None, + query_params=None, + cert=None, + trust_all=False, + username=None, + password=None, + token=None, + tenant=None, + kerberos_env=None, + timeout=None, + retries=None, + ): + hosts = list(host) if isinstance(host, list) else [host] + hosts = [ipv6_url_compat(h) for h in hosts] + random.shuffle(hosts) + self.hosts = itertools.cycle(hosts) + self.retries = retries or len(hosts) - def __init__(self, host, port=DEFAULT_PORT, - protocol=DEFAULT_PROTOCOL, api_version=DEFAULT_API_VERSION, - headers=None, query_params=None, cert=None, trust_all=False, - username=None, password=None, token=None, tenant=None, - kerberos_env=None, timeout=None, session=None): self.port = port - self.host = ipv6_url_compat(host) self.protocol = protocol self.api_version = api_version self.kerberos_env = kerberos_env @@ -106,19 +126,169 @@ def __init__(self, host, port=DEFAULT_PORT, log_value=False) self._set_header(constants.CLOUDIFY_TOKEN_AUTHENTICATION_HEADER, token) self._set_header(CLOUDIFY_TENANT_HEADER, tenant) - if session is None: - session = requests.Session() - self._session = session + self._has_kerberos = None - @property - def url(self): - return '{0}://{1}:{2}/api/{3}'.format(self.protocol, self.host, - self.port, self.api_version) - - def has_kerberos(self): if self.kerberos_env is not None: - return self.kerberos_env - return bool(HTTPKerberosAuth) and is_kerberos_env() + self.has_kerberos = True + else: + self.has_kerberos = bool(HTTPKerberosAuth) and is_kerberos_env() + + if self.has_kerberos: + self.auth = self._make_kerberos_auth() + else: + self.auth = None + + def _make_kerberos_auth(self): + if self.has_kerberos and not self.has_auth_header(): + if HTTPKerberosAuth is None: + raise exceptions.CloudifyClientError( + 'Trying to create a client with kerberos, ' + 'but kerberos_env does not exist') + return HTTPKerberosAuth() + + def _get_total_headers(self, headers): + total_headers = self.headers.copy() + if headers: + total_headers.update(headers) + return total_headers + + def _get_total_params(self, params): + total_params = self.query_params.copy() + if params: + total_params.update(params) + return { + k: self._format_querystring_param(v) + for k, v in total_params.items() + if k is not None and v is not None + } + + def _format_querystring_param(self, param): + if isinstance(param, bool): + return str(param) + return param + + def get_host(self): + return next(self.hosts) + + def get_request_url(self, host, uri, versioned=True): + base_url = f'{self.protocol}://{host}:{self.port}/api' + if not versioned: + return f'{base_url}{uri}' + return f'{base_url}/{self.api_version}{uri}' + + def _log_request(self, method, uri, data): + if not self.logger.isEnabledFor(logging.DEBUG): + return + self.logger.debug( + 'Sending request: %s %s; body: %r', + method, + uri, + data if not data or isinstance(data, dict) else '(bytes data)', + ) + + def get(self, uri, data=None, params=None, headers=None, _include=None, + expected_status_code=200, stream=False, timeout=None, + versioned_url=True, wrapper=None): + if _include: + fields = ','.join(_include) + if not params: + params = {} + params['_include'] = fields + + self._log_request('GET', uri, data=data) + return self.do_request( + 'GET', + uri, + data=data, + params=self._get_total_params(params), + headers=self._get_total_headers(headers), + expected_status_code=expected_status_code, + stream=stream, + timeout=timeout, + wrapper=wrapper, + versioned_url=versioned_url, + ) + + def put(self, uri, data=None, params=None, headers=None, + expected_status_code=200, stream=False, timeout=None, + wrapper=None, versioned_url=True): + self._log_request('PUT', uri, data) + return self.do_request( + 'PUT', + uri, + data=data, + params=self._get_total_params(params), + headers=self._get_total_headers(headers), + expected_status_code=expected_status_code, + stream=stream, + timeout=timeout, + wrapper=wrapper, + versioned_url=versioned_url, + ) + + def patch(self, uri, data=None, params=None, headers=None, + expected_status_code=200, stream=False, timeout=None, + wrapper=None, versioned_url=True): + self._log_request('PATCH', uri, data) + return self.do_request( + 'PATCH', + uri, + data=data, + params=self._get_total_params(params), + headers=self._get_total_headers(headers), + expected_status_code=expected_status_code, + stream=stream, + timeout=timeout, + wrapper=wrapper, + versioned_url=versioned_url, + ) + + def post(self, uri, data=None, params=None, headers=None, + expected_status_code=200, stream=False, timeout=None, + wrapper=None, versioned_url=True): + self._log_request('POST', uri, data) + return self.do_request( + 'POST', + uri, + data=data, + params=self._get_total_params(params), + headers=self._get_total_headers(headers), + expected_status_code=expected_status_code, + stream=stream, + timeout=timeout, + wrapper=wrapper, + versioned_url=versioned_url, + ) + + def delete(self, uri, data=None, params=None, headers=None, + expected_status_code=(200, 204), stream=False, timeout=None, + wrapper=None, versioned_url=True): + self._log_request('DELETE', uri, data) + return self.do_request( + 'DELETE', + uri, + data=data, + params=self._get_total_params(params), + headers=self._get_total_headers(headers), + expected_status_code=expected_status_code, + stream=stream, + timeout=timeout, + wrapper=wrapper, + versioned_url=versioned_url, + ) + + def _get_auth_header(self, username, password): + if not username or not password: + return None + credentials = '{0}:{1}'.format(username, password).encode('utf-8') + encoded_credentials = b64encode(credentials).decode('utf-8') + return BASIC_AUTH_PREFIX + ' ' + encoded_credentials + + def _set_header(self, key, value, log_value=True): + if not value: + return + self.headers[key] = value + value = value if log_value else '*' def has_auth_header(self): auth_headers = [constants.CLOUDIFY_AUTHENTICATION_HEADER, @@ -126,60 +296,51 @@ def has_auth_header(self): constants.CLOUDIFY_TOKEN_AUTHENTICATION_HEADER] return any(header in self.headers for header in auth_headers) - def _raise_client_error(self, response, url=None): - try: - result = response.json() - except Exception: - if response.status_code == 304: - error_msg = 'Nothing to modify' - self._prepare_and_raise_exception( - message=error_msg, - error_code='not_modified', - status_code=response.status_code, - server_traceback='') - else: - message = response.content - if url: - message = '{0} [{1}]'.format(message, url) - error_msg = '{0}: {1}'.format(response.status_code, message) - raise exceptions.CloudifyClientError( - error_msg, - status_code=response.status_code, - response=response) - # this can be changed after RD-3539 - message = result.get('message') or result.get('detail') - code = result.get('error_code') - server_traceback = result.get('server_traceback') - self._prepare_and_raise_exception( - message=message, - error_code=code, - status_code=response.status_code, - server_traceback=server_traceback, - response=response) - - @staticmethod - def _prepare_and_raise_exception(message, - error_code, - status_code, - server_traceback=None, - response=None): - - error = exceptions.ERROR_MAPPING.get(error_code, - exceptions.CloudifyClientError) - raise error(message, server_traceback, - status_code, error_code=error_code, response=response) - - def verify_response_status(self, response, expected_code=200): - if response.status_code != expected_code: - self._raise_client_error(response) - - def _do_request(self, requests_method, request_url, body, params, headers, - expected_status_code, stream, verify, timeout): + def _is_fileserver_download(self, response): + """Is this response a file-download response? + + 404 responses to requests that download files, need to be retried + with all managers in the cluster: if some file was not yet + replicated, another manager might have this file. + + This is because the file replication is asynchronous. + """ + # str() the url because sometimes (aiohttp) it is a URL object + if re.search('/(blueprints|snapshots)/', str(response.url)): + return True + disposition = response.headers.get('Content-Disposition') + if not disposition: + return False + return disposition.strip().startswith('attachment') + + +class HTTPClient(HTTPClientBase): + def __init__(self, *args, **kwargs): + session = kwargs.pop('session', None) + super().__init__(*args, **kwargs) + + if session is None: + session = requests.Session() + self._session = session + + def do_request( + self, + method, + uri, + data, + params, + headers, + expected_status_code, + stream, + timeout, + wrapper, + versioned_url=True, + ): """Run a requests method. :param request_method: string choosing the method, eg "get" or "post" :param request_url: the URL to run the request against - :param body: request body, as a string + :param data: request data, dict or string :param params: querystring parameters, as a dict :param headers: request headers, as a dict :param expected_status_code: check that the response is this @@ -188,21 +349,76 @@ def _do_request(self, requests_method, request_url, body, params, headers, :param verify: the CA cert path :param timeout: request timeout or a (connect, read) timeouts pair """ - auth = None - if self.has_kerberos() and not self.has_auth_header(): - if HTTPKerberosAuth is None: - raise exceptions.CloudifyClientError( - 'Trying to create a client with kerberos, ' - 'but kerberos_env does not exist') - auth = HTTPKerberosAuth() - response = requests_method(request_url, - data=body, - params=params, - headers=headers, - stream=stream, - verify=verify, - timeout=timeout or self.default_timeout_sec, - auth=auth) + requests_method = getattr(self._session, method.lower(), None) + if requests_method is None: + raise RuntimeError(f'Unknown method: {method}') + + copied_data = None + if isinstance(data, types.GeneratorType): + copied_data = itertools.tee(data, self.retries) + elif isinstance(data, dict): + data = json.dumps(data) + + errors = {} + for retry in range(self.retries): + manager_to_try = self.get_host() + request_url = self.get_request_url( + manager_to_try, + uri, + versioned=versioned_url, + ) + if copied_data is not None: + data = copied_data[retry] + try: + response = requests_method( + request_url, + data=data, + params=params, + headers=headers, + stream=stream, + verify=self.get_request_verify(), + timeout=timeout or self.default_timeout_sec, + auth=self.auth, + ) + except requests.exceptions.SSLError as e: + errors[manager_to_try] = exceptions.format_ssl_error(e) + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + continue + except requests.exceptions.ConnectionError as e: + errors[manager_to_try] = exceptions.format_connection_error(e) + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + continue + except exceptions.CloudifyClientError as e: + self.logger.debug( + 'HTTP Client error: %s %s: %s', method, uri, e) + errors[manager_to_try] = e.status_code + if e.response.status_code == 502: + continue + if e.response.status_code == 404 and \ + self._is_fileserver_download(e.response): + continue + else: + raise + + return self.process_response( + response, + expected_status_code, + stream, + wrapper, + ) + mgr_errors = ', '.join(f'{host}: {e}' for host, e in errors.items()) + raise exceptions.CloudifyClientError( + f'HTTP Client error: {method} {uri} ({mgr_errors})') + + def process_response( + self, + response, + expected_status_code, + stream, + wrapper + ): if self.logger.isEnabledFor(logging.DEBUG): for hdr, hdr_content in response.request.headers.items(): self.logger.debug('request header: %s: %s', hdr, hdr_content) @@ -214,7 +430,8 @@ def _do_request(self, requests_method, request_url, body, params, headers, if isinstance(expected_status_code, numbers.Number): expected_status_code = [expected_status_code] if response.status_code not in expected_status_code: - self._raise_client_error(response, request_url) + raise exceptions.CloudifyClientError.from_response( + response, response.status_code, response.content) if response.status_code == 204: return None @@ -227,6 +444,8 @@ def _do_request(self, requests_method, request_url, body, params, headers, if response.history: response_json['history'] = response.history + if wrapper: + return wrapper(response_json) return response_json def get_request_verify(self): @@ -239,152 +458,6 @@ def get_request_verify(self): # verify the certificate return True - def do_request(self, - requests_method, - uri, - data=None, - params=None, - headers=None, - expected_status_code=200, - stream=False, - versioned_url=True, - timeout=None): - if versioned_url: - request_url = '{0}{1}'.format(self.url, uri) - else: - # remove version from url ending - url = self.url.rsplit('/', 1)[0] - request_url = '{0}{1}'.format(url, uri) - - # build headers - headers = headers or {} - total_headers = self.headers.copy() - total_headers.update(headers) - - # build query params - params = params or {} - total_params = self.query_params.copy() - total_params.update(params) - - # data is either dict, bytes data or None - is_dict_data = isinstance(data, dict) - body = json.dumps(data) if is_dict_data else data - if self.logger.isEnabledFor(logging.DEBUG): - log_message = 'Sending request: {0} {1}'.format( - requests_method.__name__.upper(), - request_url) - if is_dict_data: - log_message += '; body: {0}'.format(body) - elif data is not None: - log_message += '; body: bytes data' - self.logger.debug(log_message) - try: - return self._do_request( - requests_method=requests_method, request_url=request_url, - body=body, params=total_params, headers=total_headers, - expected_status_code=expected_status_code, stream=stream, - verify=self.get_request_verify(), timeout=timeout) - except requests.exceptions.SSLError as e: - # Special handling: SSL Verification Error. - # We'd have liked to use `__context__` but this isn't supported in - # Py26, so as long as we support Py26, we need to go about this - # awkwardly. - if len(e.args) > 0 and 'CERTIFICATE_VERIFY_FAILED' in str( - e.args[0]): - raise requests.exceptions.SSLError( - 'Certificate verification failed; please ensure that the ' - 'certificate presented by Cloudify Manager is trusted ' - '(underlying reason: {0})'.format(e)) - raise requests.exceptions.SSLError( - 'An SSL-related error has occurred. This can happen if the ' - 'specified REST certificate does not match the certificate on ' - 'the manager. Underlying reason: {0}'.format(e)) - except requests.exceptions.ConnectionError as e: - raise requests.exceptions.ConnectionError( - '{0}' - '\nAn error occurred when trying to connect to the manager,' - 'please make sure it is online and all required ports are ' - 'open.' - '\nThis can also happen when the manager is not working with ' - 'SSL, but the client does'.format(e) - ) - - def get(self, uri, data=None, params=None, headers=None, _include=None, - expected_status_code=200, stream=False, versioned_url=True, - timeout=None): - if _include: - fields = ','.join(_include) - if not params: - params = {} - params['_include'] = fields - return self.do_request(self._session.get, - uri, - data=data, - params=params, - headers=headers, - expected_status_code=expected_status_code, - stream=stream, - versioned_url=versioned_url, - timeout=timeout) - - def put(self, uri, data=None, params=None, headers=None, - expected_status_code=200, stream=False, timeout=None): - return self.do_request(self._session.put, - uri, - data=data, - params=params, - headers=headers, - expected_status_code=expected_status_code, - stream=stream, - timeout=timeout) - - def patch(self, uri, data=None, params=None, headers=None, - expected_status_code=200, stream=False, timeout=None): - return self.do_request(self._session.patch, - uri, - data=data, - params=params, - headers=headers, - expected_status_code=expected_status_code, - stream=stream, - timeout=timeout) - - def post(self, uri, data=None, params=None, headers=None, - expected_status_code=200, stream=False, timeout=None): - return self.do_request(self._session.post, - uri, - data=data, - params=params, - headers=headers, - expected_status_code=expected_status_code, - stream=stream, - timeout=timeout) - - def delete(self, uri, data=None, params=None, headers=None, - expected_status_code=(200, 204), stream=False, timeout=None): - return self.do_request(self._session.delete, - uri, - data=data, - params=params, - headers=headers, - expected_status_code=expected_status_code, - stream=stream, - timeout=timeout) - - def _get_auth_header(self, username, password): - if not username or not password: - return None - credentials = '{0}:{1}'.format(username, password).encode('utf-8') - encoded_credentials = b64encode(credentials).decode('utf-8') - return BASIC_AUTH_PREFIX + ' ' + encoded_credentials - - def _set_header(self, key, value, log_value=True): - if not value: - return - self.headers[key] = value - value = value if log_value else '*' - self.logger.debug('Setting `%s` header: %s', key, value) - class StreamedResponse(object): @@ -409,11 +482,25 @@ class CloudifyClient(object): """Cloudify's management client.""" client_class = HTTPClient - def __init__(self, host='localhost', port=None, protocol=DEFAULT_PROTOCOL, - api_version=DEFAULT_API_VERSION, headers=None, - query_params=None, cert=None, trust_all=False, - username=None, password=None, token=None, tenant=None, - kerberos_env=None, timeout=None, session=None): + def __init__( + self, + host='localhost', + port=None, + protocol=DEFAULT_PROTOCOL, + api_version=DEFAULT_API_VERSION, + headers=None, + query_params=None, + cert=None, + trust_all=False, + username=None, + password=None, + token=None, + tenant=None, + kerberos_env=None, + timeout=None, + session=None, + retries=None, + ): """ Creates a Cloudify client with the provided host and optional port. @@ -434,22 +521,38 @@ def __init__(self, host='localhost', port=None, protocol=DEFAULT_PROTOCOL, :param timeout: Requests timeout value. If not set, will default to (5, None)- 5 seconds connect timeout, no read timeout. :param session: a requests.Session to use for all HTTP calls + :param retries: requests that fail with a connection error will be + retried this many times + :param retry_interval: wait this many seconds between retries :return: Cloudify client instance. """ if not port: if protocol == SECURED_PROTOCOL: - # SSL port = SECURED_PORT else: port = DEFAULT_PORT self.host = host - self._client = self.client_class(host, port, protocol, api_version, - headers, query_params, cert, - trust_all, username, password, - token, tenant, kerberos_env, timeout, - session) + self._client = self.client_class( + host=host, + port=port, + protocol=protocol, + api_version=api_version, + headers=headers, + query_params=query_params, + cert=cert, + trust_all=trust_all, + username=username, + password=password, + token=token, + tenant=tenant, + kerberos_env=kerberos_env, + timeout=timeout, + session=session, + retries=retries, + ) + self.blueprints = BlueprintsClient(self._client) self.idp = IdentityProviderClient(self._client) self.permissions = PermissionsClient(self._client) @@ -493,7 +596,4 @@ def __init__(self, host='localhost', port=None, protocol=DEFAULT_PROTOCOL, self.blueprints_labels = BlueprintsLabelsClient(self._client) self.workflows = WorkflowsClient(self._client) self.community_contacts = CommunityContactsClient(self._client) - if AuditLogAsyncClient is None: - self.auditlog = AuditLogClient(self._client) - else: - self.auditlog = AuditLogAsyncClient(self._client) + self.auditlog = AuditLogClient(self._client) diff --git a/cloudify_rest_client/deployment_modifications.py b/cloudify_rest_client/deployment_modifications.py index 7851f0d76..412d55b19 100644 --- a/cloudify_rest_client/deployment_modifications.py +++ b/cloudify_rest_client/deployment_modifications.py @@ -131,9 +131,12 @@ def list(self, deployment_id=None, _include=None, **kwargs): params.update(kwargs) uri = '/deployment-modifications' - response = self.api.get(uri, params=params, _include=_include) - items = [DeploymentModification(item) for item in response['items']] - return ListResponse(items, response['metadata']) + return self.api.get( + uri, + params=params, + _include=_include, + wrapper=ListResponse.of(DeploymentModification), + ) def start(self, deployment_id, nodes, context=None): """Start deployment modification. @@ -152,19 +155,23 @@ def start(self, deployment_id, nodes, context=None): if context is not None: data['context'] = context - uri = '/deployment-modifications' - response = self.api.post(uri, data, - expected_status_code=201) - return DeploymentModification(response) + return self.api.post( + '/deployment-modifications', + data, + expected_status_code=201, + wrapper=DeploymentModification, + ) def get(self, modification_id, _include=None): """Get deployment modification :param modification_id: The modification id """ - uri = '/deployment-modifications/{0}'.format(modification_id) - response = self.api.get(uri, _include=_include) - return DeploymentModification(response) + return self.api.get( + '/deployment-modifications/{0}'.format(modification_id), + _include=_include, + wrapper=DeploymentModification, + ) def finish(self, modification_id): """Finish deployment modification @@ -173,9 +180,10 @@ def finish(self, modification_id): """ assert modification_id - uri = '/deployment-modifications/{0}/finish'.format(modification_id) - response = self.api.post(uri) - return DeploymentModification(response) + return self.api.post( + '/deployment-modifications/{0}/finish'.format(modification_id), + wrapper=DeploymentModification, + ) def rollback(self, modification_id): """Rollback deployment modification @@ -184,6 +192,7 @@ def rollback(self, modification_id): """ assert modification_id - uri = '/deployment-modifications/{0}/rollback'.format(modification_id) - response = self.api.post(uri) - return DeploymentModification(response) + return self.api.post( + '/deployment-modifications/{0}/rollback'.format(modification_id), + wrapper=DeploymentModification, + ) diff --git a/cloudify_rest_client/deployment_updates.py b/cloudify_rest_client/deployment_updates.py index 18a7a7566..494efa083 100644 --- a/cloudify_rest_client/deployment_updates.py +++ b/cloudify_rest_client/deployment_updates.py @@ -13,16 +13,6 @@ # * See the License for the specific language governing permissions and # * limitations under the License. -import os -import json -import shutil -import tempfile -from urllib.parse import quote as urlquote, urlparse -from urllib.request import pathname2url - -from mimetypes import MimeTypes - -from cloudify_rest_client import utils from cloudify_rest_client.responses import ListResponse @@ -96,8 +86,7 @@ def create(self, update_id, deployment_id, **kwargs): 'deployment_id': deployment_id, } data.update(kwargs) - response = self.api.put(url, data=data) - return DeploymentUpdate(response) + return self.api.put(url, data=data, wrapper=DeploymentUpdate) def set_attributes(self, update_id, **kwargs): """Update a deployment-update object with the given attributes. @@ -105,8 +94,10 @@ def set_attributes(self, update_id, **kwargs): This is only useful from within the deployment-update workflow. Do not use this otherwise. """ - url = '/deployment-updates/{0}'.format(update_id) - self.api.patch(url, data=kwargs) + return self.api.patch( + '/deployment-updates/{0}'.format(update_id), + data=kwargs, + ) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """List deployment updates @@ -123,79 +114,20 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get(uri, params=params, _include=_include) - items = [DeploymentUpdate(item) for item in response['items']] - return ListResponse(items, response['metadata']) + return self.api.get( + uri, + params=params, + _include=_include, + wrapper=ListResponse.of(DeploymentUpdate), + ) def bulk_insert(self, updates): """Bulk insert deployment updates. For internal use only.""" - uri = '/deployment-updates' - self.api.post(uri, {'deployment_updates': updates}, - expected_status_code=[201, 204]) - - def _update_from_blueprint(self, - deployment_id, - blueprint_path, - inputs=None): - """Create a deployment update transaction for blueprint app. - - :param deployment_id: The deployment id - :param blueprint_path: the path of the blueprint to stage - """ - assert deployment_id - - tempdir = tempfile.mkdtemp() - try: - tar_path = utils.tar_blueprint(blueprint_path, tempdir) - application_filename = os.path.basename(blueprint_path) - - return self._update_from_archive(deployment_id, - tar_path, - application_filename, - inputs=inputs) - finally: - shutil.rmtree(tempdir) - - @staticmethod - def _update_from_archive(deployment_id, - archive_path, - application_file_name=None, - inputs=None): - """Create a deployment update transaction for an archived app. - - :param archive_path: the path for the archived app. - :param application_file_name: the main blueprint filename. - :param deployment_id: the deployment id to update. - :return: DeploymentUpdate dict - :rtype: DeploymentUpdate - """ - assert deployment_id - - mime_types = MimeTypes() - - data_form = {} - params = {} - # all the inputs are passed through the query - if inputs: - data_form['inputs'] = ('inputs', json.dumps(inputs), 'text/plain') - - if application_file_name: - params['application_file_name'] = urlquote(application_file_name) - - # For a Windows path (e.g. "C:\aaa\bbb.zip") scheme is the - # drive letter and therefore the 2nd condition is present - if all([urlparse(archive_path).scheme, - not os.path.exists(archive_path)]): - # archive location is URL - params['blueprint_archive_url'] = archive_path - else: - data_form['blueprint_archive'] = ( - os.path.basename(archive_path), - open(archive_path, 'rb'), - # Guess the archive mime type - mime_types.guess_type(pathname2url(archive_path))) - - return data_form, params + return self.api.post( + '/deployment-updates', + {'deployment_updates': updates}, + expected_status_code=[201, 204], + ) def get(self, update_id, _include=None): """Get deployment update @@ -203,8 +135,7 @@ def get(self, update_id, _include=None): :param update_id: The update id """ uri = '/deployment-updates/{0}'.format(update_id) - response = self.api.get(uri, _include=_include) - return DeploymentUpdate(response) + return self.api.get(uri, _include=_include, wrapper=DeploymentUpdate) def update_with_existing_blueprint( self, @@ -254,8 +185,7 @@ def update_with_existing_blueprint( if reevaluate_active_statuses is not None: data['reevaluate_active_statuses'] = reevaluate_active_statuses uri = '/deployment-updates/{0}/update/initiate'.format(deployment_id) - response = self.api.post(uri, data=data) - return DeploymentUpdate(response) + return self.api.post(uri, data=data, wrapper=DeploymentUpdate) def finalize_commit(self, update_id): """Finalize the committing process @@ -266,5 +196,4 @@ def finalize_commit(self, update_id): assert update_id uri = '/deployment-updates/{0}/update/finalize'.format(update_id) - response = self.api.post(uri) - return DeploymentUpdate(response) + return self.api.post(uri, wrapper=DeploymentUpdate) diff --git a/cloudify_rest_client/deployments.py b/cloudify_rest_client/deployments.py index 2bcd4169c..ecf12bf5f 100644 --- a/cloudify_rest_client/deployments.py +++ b/cloudify_rest_client/deployments.py @@ -369,15 +369,18 @@ def list(self, _include=None, **kwargs): if _include: params['_include'] = ','.join(_include) - response = self.api.get('/deployment-groups', params=params) - return ListResponse( - [DeploymentGroup(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/deployment-groups', + params=params, + wrapper=ListResponse.of(DeploymentGroup), + ) def get(self, group_id): """Get the specified deployment group.""" - response = self.api.get('/deployment-groups/{0}'.format(group_id)) - return DeploymentGroup(response) + return self.api.get( + '/deployment-groups/{0}'.format(group_id), + wrapper=DeploymentGroup, + ) def put(self, group_id, visibility=VisibilityState.TENANT, description=None, blueprint_id=None, default_inputs=None, @@ -430,10 +433,10 @@ def put(self, group_id, visibility=VisibilityState.TENANT, data['created_by'] = created_by if creation_counter: data['creation_counter'] = creation_counter - response = self.api.put( + return self.api.put( '/deployment-groups/{0}'.format(group_id), data=data, + wrapper=DeploymentGroup, ) - return DeploymentGroup(response) def add_deployments(self, group_id, deployment_ids=None, count=None, new_deployments=None, filter_id=None, @@ -474,6 +477,7 @@ def add_deployments(self, group_id, deployment_ids=None, count=None, else: batches = [new_deployments] + # TODO this is not async-friendly for new_deployments_batch in batches: response = self.api.patch( '/deployment-groups/{0}'.format(group_id), @@ -504,7 +508,7 @@ def remove_deployments(self, group_id, deployment_ids=None, group given by this id :return: the updated deployment group """ - response = self.api.patch( + return self.api.patch( '/deployment-groups/{0}'.format(group_id), data={ 'remove': { @@ -513,9 +517,9 @@ def remove_deployments(self, group_id, deployment_ids=None, 'filter_rules': filter_rules, 'deployments_from_group': deployments_from_group, } - } + }, + wrapper=DeploymentGroup, ) - return DeploymentGroup(response) def delete(self, group_id, delete_deployments=False, force=False, with_logs=False): @@ -527,7 +531,7 @@ def delete(self, group_id, delete_deployments=False, :param force: same meaning as in deployments.delete :param with_logs: same meaning as in deployments.delete """ - self.api.delete( + return self.api.delete( '/deployment-groups/{0}'.format(group_id), params={ 'delete_deployments': delete_deployments, @@ -551,8 +555,7 @@ def get(self, deployment_id): """ assert deployment_id uri = '/deployments/{0}/outputs'.format(deployment_id) - response = self.api.get(uri) - return DeploymentOutputs(response) + return self.api.get(uri, wrapper=DeploymentOutputs) class DeploymentCapabilitiesClient(object): @@ -568,8 +571,7 @@ def get(self, deployment_id): """ assert deployment_id uri = '/deployments/{0}/capabilities'.format(deployment_id) - response = self.api.get(uri) - return DeploymentCapabilities(response) + return self.api.get(uri, wrapper=DeploymentCapabilities) def list(self, deployment_id, _include=None, constraints=None, **kwargs): """ @@ -592,11 +594,11 @@ def list(self, deployment_id, _include=None, constraints=None, **kwargs): constraints = dict() constraints['deployment_id'] = deployment_id - response = self.api.post('/searches/capabilities', params=params, - data={'constraints': constraints}) - return ListResponse( - items=[DeploymentCapabilities(item) for item in response['items']], - metadata=response['metadata'] + return self.api.post( + '/searches/capabilities', + params=params, + data={'constraints': constraints}, + wrapper=ListResponse.of(DeploymentCapabilities), ) @@ -628,11 +630,11 @@ def list(self, blueprint_id=None, deployment_id=None, if _include: params['_include'] = ','.join(_include) - response = self.api.post('/searches/scaling-groups', params=params, - data={'constraints': constraints or {}}) - return ListResponse( - items=[DeploymentScalingGroup(item) for item in response['items']], - metadata=response['metadata'] + return self.api.post( + '/searches/scaling-groups', + params=params, + data={'constraints': constraints or {}}, + wrapper=ListResponse.of(DeploymentScalingGroup), ) @@ -674,16 +676,25 @@ def list(self, _include=None, sort=None, is_descending=False, params['_filter_id'] = filter_id if filter_rules: - response = self.api.post('/searches/deployments', params=params, - data={'filter_rules': filter_rules}) + return self.api.post( + '/searches/deployments', + params=params, + data={'filter_rules': filter_rules}, + wrapper=ListResponse.of(Deployment), + ) elif constraints: - response = self.api.post('/searches/deployments', params=params, - data={'constraints': constraints}) + return self.api.post( + '/searches/deployments', + params=params, + data={'constraints': constraints}, + wrapper=ListResponse.of(Deployment), + ) else: - response = self.api.get('/deployments', params=params) - - return ListResponse([Deployment(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/deployments', + params=params, + wrapper=ListResponse.of(Deployment), + ) def get(self, deployment_id, @@ -704,13 +715,13 @@ def get(self, """ assert deployment_id uri = '/deployments/{0}'.format(deployment_id) - response = self.api.get( + return self.api.get( uri, _include=_include, params={'all_sub_deployments': all_sub_deployments, - 'include_workdir': include_workdir} + 'include_workdir': include_workdir}, + wrapper=Deployment, ) - return Deployment(response) def create(self, blueprint_id, @@ -822,9 +833,13 @@ def create(self, if async_create is not None: # if it's None, we just keep the server's default behaviour params['async_create'] = async_create - response = self.api.put( - uri, data, params=params, expected_status_code=201) - return Deployment(response) + return self.api.put( + uri, + data, + params=params, + expected_status_code=201, + wrapper=Deployment, + ) def delete(self, deployment_id, force=False, @@ -850,7 +865,7 @@ def delete(self, deployment_id, warnings.warn('delete_db_mode is deprecated and does nothing', DeprecationWarning) - self.api.delete( + return self.api.delete( '/deployments/{0}'.format(deployment_id), params=params) def set_visibility(self, deployment_id, visibility): @@ -900,9 +915,11 @@ def update_labels(self, deployment_id, labels, creator=None, data['creator'] = creator if created_at: data['created_at'] = created_at - updated_dep = self.api.patch( - '/deployments/{0}'.format(deployment_id), data=data) - return Deployment(updated_dep) + return self.api.patch( + '/deployments/{0}'.format(deployment_id), + data=data, + wrapper=Deployment, + ) def set_attributes(self, deployment_id, **kwargs): """Set arbitrary properties on the deployment. @@ -912,6 +929,8 @@ def set_attributes(self, deployment_id, **kwargs): For internal use only. """ - updated_dep = self.api.patch( - '/deployments/{0}'.format(deployment_id), data=kwargs) - return Deployment(updated_dep) + return self.api.patch( + '/deployments/{0}'.format(deployment_id), + data=kwargs, + wrapper=Deployment, + ) diff --git a/cloudify_rest_client/evaluate.py b/cloudify_rest_client/evaluate.py index 985a4f1c5..893a64c77 100644 --- a/cloudify_rest_client/evaluate.py +++ b/cloudify_rest_client/evaluate.py @@ -55,9 +55,12 @@ def functions(self, deployment_id, context, payload): :rtype: EvaluatedFunctions """ assert deployment_id - result = self.api.post('/evaluate/functions', data={ - 'deployment_id': deployment_id, - 'context': context, - 'payload': payload - }) - return EvaluatedFunctions(result) + return self.api.post( + '/evaluate/functions', + data={ + 'deployment_id': deployment_id, + 'context': context, + 'payload': payload + }, + wrapper=EvaluatedFunctions, + ) diff --git a/cloudify_rest_client/events.py b/cloudify_rest_client/events.py index 31e17e573..cfc17edce 100644 --- a/cloudify_rest_client/events.py +++ b/cloudify_rest_client/events.py @@ -1,4 +1,3 @@ -import warnings from datetime import datetime from cloudify_rest_client.responses import ListResponse @@ -9,34 +8,6 @@ class EventsClient(object): def __init__(self, api): self.api = api - def get(self, - execution_id, - from_event=0, - batch_size=100, - include_logs=False): - """ - Returns event for the provided execution id. - - :param execution_id: Id of execution to get events for. - :param from_event: Index of first event to retrieve on pagination. - :param batch_size: Maximum number of events to retrieve per call. - :param include_logs: Whether to also get logs. - :return: Events list and total number of currently available - events (tuple). - """ - warnings.warn('method is deprecated, use "{0}" method instead' - .format(self.list.__name__), - DeprecationWarning) - - response = self.list(execution_id=execution_id, - include_logs=include_logs, - _offset=from_event, - _size=batch_size, - _sort='@timestamp') - events = response.items - total_events = response.metadata.pagination.total - return events, total_events - def list(self, include_logs=False, message=None, from_datetime=None, to_datetime=None, _include=None, sort=None, **kwargs): """List events @@ -59,8 +30,12 @@ def list(self, include_logs=False, message=None, from_datetime=None, sort=sort, **kwargs) - response = self.api.get(uri, _include=_include, params=params) - return ListResponse(response['items'], response['metadata']) + return self.api.get( + uri, + _include=_include, + params=params, + wrapper=ListResponse.of(dict), + ) def create(self, events=None, logs=None, execution_id=None, agent_name=None, manager_name=None, @@ -84,7 +59,11 @@ def create(self, events=None, logs=None, execution_id=None, if execution_group_id: data['execution_group_id'] = execution_group_id - self.api.post('/events', data=data, expected_status_code=(201, 204)) + return self.api.post( + '/events', + data=data, + expected_status_code=(201, 204), + ) def delete(self, deployment_id, include_logs=False, message=None, from_datetime=None, to_datetime=None, sort=None, **kwargs): @@ -109,9 +88,11 @@ def delete(self, deployment_id, include_logs=False, message=None, deployment_id=deployment_id, **kwargs) - response = self.api.delete(uri, params=params, - expected_status_code=200) - return ListResponse(response['items'], response['metadata']) + return self.api.delete( + uri, params=params, + expected_status_code=200, + wrapper=ListResponse.of(lambda x: x), + ) @staticmethod def _create_query(include_logs=False, message=None, from_datetime=None, diff --git a/cloudify_rest_client/exceptions.py b/cloudify_rest_client/exceptions.py index 35ee790d2..f79045c5e 100644 --- a/cloudify_rest_client/exceptions.py +++ b/cloudify_rest_client/exceptions.py @@ -13,8 +13,67 @@ # * See the License for the specific language governing permissions and # * limitations under the License. +import json +import requests.exceptions + + +def format_ssl_error(e): + # Special handling: SSL Verification Error. + # We'd have liked to use `__context__` but this isn't supported in + # Py26, so as long as we support Py26, we need to go about this + # awkwardly. + if len(e.args) > 0 and 'CERTIFICATE_VERIFY_FAILED' in str( + e.args[0]): + return requests.exceptions.SSLError( + 'Certificate verification failed; please ensure that the ' + 'certificate presented by Cloudify Manager is trusted ' + '(underlying reason: {0})'.format(e)) + return requests.exceptions.SSLError( + 'An SSL-related error has occurred. This can happen if the ' + 'specified REST certificate does not match the certificate on ' + 'the manager. Underlying reason: {0}'.format(e)) + + +def format_connection_error(e): + return requests.exceptions.ConnectionError( + '{0}' + '\nAn error occurred when trying to connect to the manager,' + 'please make sure it is online and all required ports are ' + 'open.' + '\nThis can also happen when the manager is not working with ' + 'SSL, but the client does'.format(e) + ) + class CloudifyClientError(Exception): + @classmethod + def from_response(cls, response, status, response_content): + try: + result = json.loads(response_content.decode('utf-8')) + except Exception: + if status == 304: + return NotModifiedError( + message='Nothing to modify', + error_code=NotModifiedError.ERROR_CODE, + status_code=status, + ) + return cls( + message=f'{status}: {response_content}', + status_code=status, + response=response, + ) + + message = result.get('message') or result.get('detail') + code = result.get('error_code') + server_traceback = result.get('server_traceback') + error_cls = ERROR_MAPPING.get(code, cls) + return error_cls( + message, + server_traceback=server_traceback, + status_code=status, + error_code=code, + response=response, + ) def __init__(self, message, server_traceback=None, status_code=-1, error_code=None, response=None): @@ -282,8 +341,8 @@ def __init__(self, message, server_traceback=None, status_code=-1, error_code=None, response=None): super(InvalidFilterRule, self).__init__( message, server_traceback, status_code, error_code, response) - self.err_filter_rule = response.json().get('err_filter_rule') - self.err_reason = response.json().get('err_reason') + self.err_filter_rule = response.json.get('err_filter_rule') + self.err_reason = response.json.get('err_reason') class DeploymentParentNotFound(CloudifyClientError): diff --git a/cloudify_rest_client/execution_schedules.py b/cloudify_rest_client/execution_schedules.py index c18257411..aa71aee68 100644 --- a/cloudify_rest_client/execution_schedules.py +++ b/cloudify_rest_client/execution_schedules.py @@ -168,11 +168,13 @@ def create(self, schedule_id, deployment_id, workflow_id, if created_at: data['created_at'] = created_at uri = '/{self._uri_prefix}/{id}'.format(self=self, id=schedule_id) - response = self.api.put(uri, - data=data, - params=params, - expected_status_code=201) - return ExecutionSchedule(response) + return self.api.put( + uri, + data=data, + params=params, + expected_status_code=201, + wrapper=ExecutionSchedule, + ) def update(self, schedule_id, deployment_id, since=None, until=None, recurrence=None, count=None, weekdays=None, rrule=None, @@ -224,11 +226,13 @@ def update(self, schedule_id, deployment_id, since=None, until=None, 'workflow_id': workflow_id, } uri = '/{self._uri_prefix}/{id}'.format(self=self, id=schedule_id) - response = self.api.patch(uri, - data=data, - params=params, - expected_status_code=201) - return ExecutionSchedule(response) + return self.api.patch( + uri, + data=data, + params=params, + expected_status_code=201, + wrapper=ExecutionSchedule, + ) def delete(self, schedule_id, deployment_id): """ @@ -239,10 +243,11 @@ def delete(self, schedule_id, deployment_id): """ assert schedule_id params = {'deployment_id': deployment_id} - self.api.delete('/{self._uri_prefix}/{id}'.format(self=self, - id=schedule_id), - params=params, - expected_status_code=204) + return self.api.delete( + '/{self._uri_prefix}/{id}'.format(self=self, id=schedule_id), + params=params, + expected_status_code=204, + ) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """ @@ -259,11 +264,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - params=params, _include=_include) - return ListResponse([ExecutionSchedule(item) - for item in response['items']], - response['metadata']) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + params=params, + _include=_include, + wrapper=ListResponse.of(ExecutionSchedule), + ) def get(self, schedule_id, deployment_id, _include=None): """Get an execution schedule by its id. @@ -276,5 +282,9 @@ def get(self, schedule_id, deployment_id, _include=None): assert schedule_id params = {'deployment_id': deployment_id} uri = '/{self._uri_prefix}/{id}'.format(self=self, id=schedule_id) - response = self.api.get(uri, _include=_include, params=params) - return ExecutionSchedule(response) + return self.api.get( + uri, + _include=_include, + params=params, + wrapper=ExecutionSchedule, + ) diff --git a/cloudify_rest_client/executions.py b/cloudify_rest_client/executions.py index c8aa59b81..977ea31e9 100644 --- a/cloudify_rest_client/executions.py +++ b/cloudify_rest_client/executions.py @@ -190,9 +190,10 @@ def list(self, _include=None, **kwargs): response['metadata']) def get(self, execution_group_id): - response = self.api.get( - '/execution-groups/{0}'.format(execution_group_id)) - return ExecutionGroup(response) + return self.api.get( + '/execution-groups/{0}'.format(execution_group_id), + wrapper=ExecutionGroup, + ) def create(self, deployment_group_id, workflow_id, executions, force=False, default_parameters=None, parameters=None, @@ -218,8 +219,11 @@ def create(self, deployment_group_id, workflow_id, executions, args['created_by'] = created_by if created_at: args['created_at'] = created_at - response = self.api.post('/execution-groups', data=args) - return ExecutionGroup(response) + return self.api.post( + '/execution-groups', + data=args, + wrapper=ExecutionGroup, + ) def start(self, deployment_group_id, workflow_id, force=False, default_parameters=None, parameters=None, @@ -235,15 +239,18 @@ def start(self, deployment_group_id, workflow_id, force=False, the default parameters on a per-deployment basis :param concurrency: run this many executions at a time """ - response = self.api.post('/execution-groups', data={ - 'force': force, - 'deployment_group_id': deployment_group_id, - 'workflow_id': workflow_id, - 'parameters': parameters, - 'default_parameters': default_parameters, - 'concurrency': concurrency - }) - return ExecutionGroup(response) + return self.api.post( + '/execution-groups', + data={ + 'force': force, + 'deployment_group_id': deployment_group_id, + 'workflow_id': workflow_id, + 'parameters': parameters, + 'default_parameters': default_parameters, + 'concurrency': concurrency + }, + wrapper=ExecutionGroup, + ) def cancel(self, execution_group_id, force=False, kill=False): """Cancel the executions in this group. @@ -253,18 +260,20 @@ def cancel(self, execution_group_id, force=False, kill=False): Queued executions are marked cancelled immediately. """ action = 'kill' if kill else 'force-cancel' if force else 'cancel' - response = self.api.post( + return self.api.post( '/execution-groups/{0}'.format(execution_group_id), - data={'action': action}) - return ExecutionGroup(response) + data={'action': action}, + wrapper=ExecutionGroup, + ) def resume(self, execution_group_id, force=False): """Resume the executions in this group.""" action = 'force-resume' if force else 'resume' - response = self.api.post( + return self.api.post( '/execution-groups/{0}'.format(execution_group_id), - data={'action': action}) - return ExecutionGroup(response) + data={'action': action}, + wrapper=ExecutionGroup, + ) def set_target_group(self, execution_group_id, success_group=None, failed_group=None): @@ -281,14 +290,14 @@ def set_target_group(self, execution_group_id, :param success_group: ID of the target failure deployment group :return: The updated ExecutionGroup """ - response = self.api.patch( + return self.api.patch( '/execution-groups/{0}'.format(execution_group_id), data={ 'success_group_id': success_group, 'failure_group_id': failed_group, - } + }, + wrapper=ExecutionGroup, ) - return ExecutionGroup(response) class ExecutionsClient(object): @@ -296,7 +305,6 @@ class ExecutionsClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'executions' - self._wrapper_cls = Execution def _create_filters( self, @@ -325,8 +333,7 @@ def should_start(self, execution_id): assert execution_id uri = '/{self._uri_prefix}/{id}/should-start'.format( self=self, id=execution_id) - response = self.api.get(uri) - return response + return self.api.get(uri) def list(self, _include=None, **kwargs): """Returns a list of executions. @@ -343,13 +350,11 @@ def list(self, _include=None, **kwargs): """ params = self._create_filters(**kwargs) - response = self.api.get( + return self.api.get( '/{self._uri_prefix}'.format(self=self), params=params, - _include=_include) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + _include=_include, + wrapper=ListResponse.of(Execution), ) def get(self, execution_id, _include=None): @@ -361,8 +366,7 @@ def get(self, execution_id, _include=None): """ assert execution_id uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id) - response = self.api.get(uri, _include=_include) - return self._wrapper_cls(response) + return self.api.get(uri, _include=_include, wrapper=Execution) def update(self, execution_id, status, error=None): """Update execution with the provided status and optional error. @@ -377,8 +381,7 @@ def update(self, execution_id, status, error=None): params = {'status': status} if error: params['error'] = error - response = self.api.patch(uri, data=params) - return Execution(response) + return self.api.patch(uri, data=params, wrapper=Execution) def start(self, *args, **kwargs): """Starts a deployment's workflow execution whose id is provided. @@ -438,10 +441,12 @@ def create(self, deployment_id, workflow_id, parameters=None, 'error': error, } uri = '/executions' - response = self.api.post(uri, - data=data, - expected_status_code=201) - return Execution(response) + return self.api.post( + uri, + data=data, + expected_status_code=201, + wrapper=Execution, + ) def cancel(self, execution_id, force=False, kill=False): """Cancels an execution. @@ -454,10 +459,12 @@ def cancel(self, execution_id, force=False, kill=False): """ uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id) action = 'kill' if kill else 'force-cancel' if force else 'cancel' - response = self.api.post(uri, - data={'action': action}, - expected_status_code=200) - return self._wrapper_cls(response) + return self.api.post( + uri, + data={'action': action}, + expected_status_code=200, + wrapper=Execution, + ) def resume(self, execution_id, force=False): """Resume an execution. @@ -469,10 +476,12 @@ def resume(self, execution_id, force=False): """ uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id) action = 'force-resume' if force else 'resume' - response = self.api.post(uri, - data={'action': action}, - expected_status_code=200) - return self._wrapper_cls(response) + return self.api.post( + uri, + data={'action': action}, + expected_status_code=200, + wrapper=Execution, + ) def requeue(self, execution_id): """ @@ -482,10 +491,12 @@ def requeue(self, execution_id): :return: Requeued execution. """ uri = '/{self._uri_prefix}/{id}'.format(self=self, id=execution_id) - response = self.api.post(uri, - data={'action': 'requeue'}, - expected_status_code=200) - return self._wrapper_cls(response) + return self.api.post( + uri, + data={'action': 'requeue'}, + expected_status_code=200, + wrapper=Execution, + ) def delete(self, to_datetime=None, keep_last=None, **kwargs): """Deletes finished executions from the DB. @@ -503,8 +514,10 @@ def delete(self, to_datetime=None, keep_last=None, **kwargs): data['to_datetime'] = to_datetime.isoformat() if keep_last: data['keep_last'] = keep_last - response = self.api.delete('/{self._uri_prefix}'.format(self=self), - data=data, - params=kwargs, - expected_status_code=200) - return response['items'][0]['count'] + return self.api.delete( + '/{self._uri_prefix}'.format(self=self), + data=data, + params=kwargs, + expected_status_code=200, + wrapper=lambda response: response['items'][0]['count'], + ) diff --git a/cloudify_rest_client/filters.py b/cloudify_rest_client/filters.py index 894a3ffc9..1bbfb6bab 100644 --- a/cloudify_rest_client/filters.py +++ b/cloudify_rest_client/filters.py @@ -75,9 +75,11 @@ def create(self, data['created_at'] = created_at if created_by: data['created_by'] = created_by - response = self.api.put('{0}/{1}'.format(self.uri, filter_id), - data=data) - return Filter(response) + return self.api.put( + '{0}/{1}'.format(self.uri, filter_id), + data=data, + wrapper=Filter, + ) def list(self, sort=None, is_descending=False, **kwargs): """Returns a list of all filters. @@ -94,16 +96,20 @@ def list(self, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get(self.uri, params=params) - return ListResponse([Filter(item) for item in response['items']], - response['metadata']) + return self.api.get( + self.uri, + params=params, + wrapper=ListResponse.of(Filter), + ) def get(self, filter_id): - response = self.api.get('{0}/{1}'.format(self.uri, filter_id)) - return Filter(response) + return self.api.get( + '{0}/{1}'.format(self.uri, filter_id), + wrapper=Filter, + ) def delete(self, filter_id): - self.api.delete('{0}/{1}'.format(self.uri, filter_id)) + return self.api.delete('{0}/{1}'.format(self.uri, filter_id)) def update(self, filter_id, new_filter_rules=None, new_visibility=None): """Updates the filter's visibility or rules @@ -131,9 +137,11 @@ def update(self, filter_id, new_filter_rules=None, new_visibility=None): if new_filter_rules: data['filter_rules'] = new_filter_rules - response = self.api.patch('{0}/{1}'.format(self.uri, filter_id), - data=data) - return Filter(response) + return self.api.patch( + '{0}/{1}'.format(self.uri, filter_id), + data=data, + wrapper=Filter, + ) class BlueprintsFiltersClient(FiltersClient): diff --git a/cloudify_rest_client/inter_deployment_dependencies.py b/cloudify_rest_client/inter_deployment_dependencies.py index 171512b99..e78977061 100644 --- a/cloudify_rest_client/inter_deployment_dependencies.py +++ b/cloudify_rest_client/inter_deployment_dependencies.py @@ -56,13 +56,6 @@ class InterDeploymentDependencyClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'deployments/inter-deployment-dependencies' - self._wrapper_cls = InterDeploymentDependency - - def _wrap_list(self, response): - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) def create(self, dependency_creator, source_deployment, target_deployment=None, @@ -93,9 +86,11 @@ def create(self, dependency_creator, source_deployment, target_deployment_func, external_source, external_target) - response = self.api.put( - '/{self._uri_prefix}'.format(self=self), data=data) - return self._wrapper_cls(response) + return self.api.put( + '/{self._uri_prefix}'.format(self=self), + data=data, + wrapper=InterDeploymentDependency, + ) def create_many(self, source_deployment_id, inter_deployment_dependencies): """Creates a number of inter-deployment dependencies. @@ -106,12 +101,14 @@ def create_many(self, source_deployment_id, inter_deployment_dependencies): dependencies descriptions, but without a source_deployment(_id). :return: a list of created InterDeploymentDependencies IDs. """ - response = self.api.post( - '/{self._uri_prefix}'.format(self=self), data={ + return self.api.post( + '/{self._uri_prefix}'.format(self=self), + data={ 'source_deployment_id': source_deployment_id, - 'inter_deployment_dependencies': inter_deployment_dependencies} + 'inter_deployment_dependencies': inter_deployment_dependencies + }, + wrapper=ListResponse.of(InterDeploymentDependency), ) - return self._wrap_list(response) def update_all(self, source_deployment_id, inter_deployment_dependencies): """Update (i.e. rewrite all) inter-deployment dependencies for @@ -123,14 +120,14 @@ def update_all(self, source_deployment_id, inter_deployment_dependencies): dependencies descriptions, but without a source_deployment(_id). :return: a list of created InterDeploymentDependencies IDs. """ - response = self.api.put( + return self.api.put( '/deployments/{0}/inter-deployment-dependencies'.format( source_deployment_id), data={ 'inter_deployment_dependencies': inter_deployment_dependencies, }, + wrapper=ListResponse.of(InterDeploymentDependency), ) - return self._wrap_list(response) def delete(self, dependency_creator, source_deployment, target_deployment=None, @@ -162,7 +159,10 @@ def delete(self, dependency_creator, source_deployment, external_source=external_source, external_target=external_target) data['is_component_deletion'] = is_component_deletion - self.api.delete('/{self._uri_prefix}'.format(self=self), data=data) + return self.api.delete( + '/{self._uri_prefix}'.format(self=self), + data=data, + ) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """ @@ -179,10 +179,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - _include=_include, - params=params) - return self._wrap_list(response) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + _include=_include, + params=params, + wrapper=ListResponse.of(InterDeploymentDependency), + ) def restore(self, deployment_id, update_service_composition): """ @@ -194,5 +196,7 @@ def restore(self, deployment_id, update_service_composition): 'deployment_id': deployment_id, 'update_service_composition': update_service_composition, } - self.api.post('/{self._uri_prefix}/restore'.format(self=self), - data=data) + return self.api.post( + '/{self._uri_prefix}/restore'.format(self=self), + data=data, + ) diff --git a/cloudify_rest_client/labels.py b/cloudify_rest_client/labels.py index a2bdd423a..5106fe92a 100644 --- a/cloudify_rest_client/labels.py +++ b/cloudify_rest_client/labels.py @@ -32,8 +32,10 @@ def list_keys(self): """ Returns all defined label keys, from all elements of the resource. """ - response = self.api.get('/labels/{0}'.format(self.resource_name)) - return ListResponse(response['items'], response['metadata']) + return self.api.get( + '/labels/{0}'.format(self.resource_name), + wrapper=ListResponse.of(lambda x: x), + ) def list_key_values(self, label_key): """ @@ -41,15 +43,18 @@ def list_key_values(self, label_key): :param label_key: The resource labels' key to list the values for. """ - response = self.api.get( - '/labels/{0}/{1}'.format(self.resource_name, label_key)) - return ListResponse(response['items'], response['metadata']) + return self.api.get( + '/labels/{0}/{1}'.format(self.resource_name, label_key), + wrapper=ListResponse.of(lambda x: x), + ) def get_reserved_labels_keys(self): """Returns the reserved labels keys (`csys-` prefixed).""" - response = self.api.get('/labels/{0}'.format(self.resource_name), - params={'_reserved': True}) - return ListResponse(response['items'], response['metadata']) + return self.api.get( + '/labels/{0}'.format(self.resource_name), + params={'_reserved': True}, + wrapper=ListResponse.of(lambda x: x), + ) class DeploymentsLabelsClient(_LabelsClient): diff --git a/cloudify_rest_client/ldap.py b/cloudify_rest_client/ldap.py index 31d153949..9283b9a80 100644 --- a/cloudify_rest_client/ldap.py +++ b/cloudify_rest_client/ldap.py @@ -119,8 +119,7 @@ def set(self, with open(ldap_ca_path) as cert_handle: params['ldap_ca_cert'] = cert_handle.read() uri = '/ldap' - response = self.api.post(uri, params) - return LdapResponse(response) + return self.api.post(uri, params, wrapper=LdapResponse) def get_status(self): uri = '/ldap' diff --git a/cloudify_rest_client/license.py b/cloudify_rest_client/license.py index 33e30ad77..1d57eeabd 100644 --- a/cloudify_rest_client/license.py +++ b/cloudify_rest_client/license.py @@ -70,7 +70,6 @@ class LicenseClient(object): def __init__(self, api): self.api = api - self._wrapper_cls = License def check(self): """Check license state of manager is healthy. @@ -78,19 +77,14 @@ def check(self): If this is not the case, the following exception will be thrown: cloudify_rest_client.exceptions.MissingCloudifyLicense """ - self.api.get('/license-check') + return self.api.get('/license-check') def list(self): """Get the Cloudify license from the Manager. :rtype: License """ - response = self.api.get('/license') - - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) + return self.api.get('/license', wrapper=ListResponse.of(License)) def upload(self, license_path): """Uploads a Cloudify license the Manager @@ -103,14 +97,12 @@ def upload(self, license_path): license_path, client=self.api) - response = self.api.put( + return self.api.put( '/license', data=data ) - return response - def delete(self): """Remove the the Cloudify license from the Manager. """ - self.api.delete('/license') + return self.api.delete('/license') diff --git a/cloudify_rest_client/log_bundles.py b/cloudify_rest_client/log_bundles.py index 056a9c428..26c780e50 100644 --- a/cloudify_rest_client/log_bundles.py +++ b/cloudify_rest_client/log_bundles.py @@ -52,8 +52,7 @@ def get(self, log_bundle_id, _include=None): :return: LogBundle. """ uri = self.base_url + log_bundle_id - response = self.api.get(uri, _include=_include) - return LogBundle(response) + return self.api.get(uri, _include=_include, wrapper=LogBundle) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """Returns a list of currently stored log bundles. @@ -68,10 +67,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get(self.base_url.rstrip('/'), - params=params, _include=_include) - return ListResponse([LogBundle(item) for item in response['items']], - response['metadata']) + return self.api.get( + self.base_url.rstrip('/'), + params=params, + _include=_include, + wrapper=ListResponse.of(LogBundle), + ) def create(self, log_bundle_id, @@ -84,15 +85,19 @@ def create(self, """ uri = self.base_url + log_bundle_id params = {'queue': queue} - response = self.api.put(uri, data=params, expected_status_code=201) - return Execution(response) + return self.api.put( + uri, + data=params, + expected_status_code=201, + wrapper=Execution, + ) def delete(self, log_bundle_id): """Deletes the log bundle whose id matches the provided log bundle id. :param log_bundle_id: The id of the log bundle to be deleted. """ uri = self.base_url + log_bundle_id - self.api.delete(uri) + return self.api.delete(uri) def download(self, log_bundle_id, output_file, progress_callback=None): """Downloads a previously created log bundle from a manager. @@ -104,10 +109,10 @@ def download(self, log_bundle_id, output_file, progress_callback=None): """ uri = self.base_url + '{}/archive'.format(log_bundle_id) + # TODO this is not async-friendly with contextlib.closing(self.api.get(uri, stream=True)) as response: output_file = bytes_stream_utils.write_response_stream_to_file( response, output_file, progress_callback=progress_callback) - return output_file def update_status(self, log_bundle_id, status, error=None): @@ -120,4 +125,4 @@ def update_status(self, log_bundle_id, status, error=None): params = {'status': status} if error: params['error'] = error - self.api.patch(uri, data=params) + return self.api.patch(uri, data=params) diff --git a/cloudify_rest_client/maintenance.py b/cloudify_rest_client/maintenance.py index e2e7992de..8ef6c6017 100644 --- a/cloudify_rest_client/maintenance.py +++ b/cloudify_rest_client/maintenance.py @@ -72,8 +72,7 @@ def status(self): :return: Maintenance mode state. """ uri = '/maintenance' - response = self.api.get(uri) - return Maintenance(response) + return self.api.get(uri, wrapper=Maintenance) def activate(self): """ @@ -83,11 +82,10 @@ def activate(self): """ uri = '/maintenance/activate' try: - response = self.api.post(uri) + return self.api.post(uri, wrapper=Maintenance) except NotModifiedError as e: e.message = 'Maintenance mode is already on.' raise - return Maintenance(response) def deactivate(self): """ @@ -97,8 +95,7 @@ def deactivate(self): """ uri = '/maintenance/deactivate' try: - response = self.api.post(uri) + return self.api.post(uri, wrapper=Maintenance) except NotModifiedError as e: e.message = 'Maintenance mode is already off.' raise - return Maintenance(response) diff --git a/cloudify_rest_client/manager.py b/cloudify_rest_client/manager.py index a18865c37..42f915326 100644 --- a/cloudify_rest_client/manager.py +++ b/cloudify_rest_client/manager.py @@ -262,8 +262,7 @@ def get_status(self): """ :return: Cloudify's management machine status. """ - response = self.api.get('/status') - return response + return self.api.get('/status') def get_config(self, name=None, scope=None): """Get configuration of the manager. @@ -272,18 +271,27 @@ def get_config(self, name=None, scope=None): provided, return all values for that scope. """ if name and scope: - response = self.api.get('/config/{0}.{1}'.format(scope, name)) - return ConfigItem(response) + return self.api.get( + '/config/{0}.{1}'.format(scope, name), + wrapper=ConfigItem, + ) if name: - response = self.api.get('/config/{0}'.format(name)) - return ConfigItem(response) + return self.api.get( + '/config/{0}'.format(name), + wrapper=ConfigItem, + ) if scope: - response = self.api.get('/config', params={'scope': scope}) + return self.api.get( + '/config', + params={'scope': scope}, + wrapper=ListResponse.of(ConfigItem), + ) else: - response = self.api.get('/config') - return ListResponse([ConfigItem(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/config', + wrapper=ListResponse.of(ConfigItem), + ) def put_config(self, name, value, force=False): """Update a given setting. @@ -292,11 +300,14 @@ def put_config(self, name, value, force=False): :param force: Force changing non-editable settings """ - response = self.api.put('/config/{0}'.format(name), data={ - 'value': value, - 'force': force - }) - return ConfigItem(response) + return self.api.put( + '/config/{0}'.format(name), + data={ + 'value': value, + 'force': force + }, + wrapper=ConfigItem, + ) def add_manager(self, hostname, private_ip, public_ip, version, edition, distribution, distro_release, @@ -320,8 +331,7 @@ def add_manager(self, hostname, private_ip, public_ip, version, manager['fs_sync_node_id'] = fs_sync_node_id if networks: manager['networks'] = networks - response = self.api.post('/managers', data=manager) - return ManagerItem(response) + return self.api.post('/managers', data=manager, wrapper=ManagerItem) def remove_manager(self, hostname): """ @@ -331,7 +341,7 @@ def remove_manager(self, hostname): the cluster, not necessarily for uninstalling the manager :param hostname: The manager's hostname """ - self.api.delete('/managers/{0}'.format(hostname)) + return self.api.delete('/managers/{0}'.format(hostname)) def update_manager(self, hostname, fs_sync_node_id, bootstrap_cluster): """ @@ -342,11 +352,14 @@ def update_manager(self, hostname, fs_sync_node_id, bootstrap_cluster): :param bootstrap_cluster: Whether it is the 1st manager in the cluster or not """ - response = self.api.put('/managers/{0}'.format(hostname), data={ - 'fs_sync_node_id': fs_sync_node_id, - 'bootstrap_cluster': bootstrap_cluster - }) - return ManagerItem(response) + return self.api.put( + '/managers/{0}'.format(hostname), + data={ + 'fs_sync_node_id': fs_sync_node_id, + 'bootstrap_cluster': bootstrap_cluster + }, + wrapper=ManagerItem, + ) def get_managers(self, hostname=None, _include=None): """ @@ -356,14 +369,18 @@ def get_managers(self, hostname=None, _include=None): :param _include: list of columns to include in the returned list """ if hostname: - response = self.api.get('/managers', params={'hostname': hostname}, - _include=_include) + return self.api.get( + '/managers', + params={'hostname': hostname}, + _include=_include, + wrapper=ListResponse.of(ManagerItem), + ) else: - response = self.api.get('/managers', _include=_include) - return ListResponse( - [ManagerItem(item) for item in response['items']], - response['metadata'] - ) + return self.api.get( + '/managers', + _include=_include, + wrapper=ListResponse.of(ManagerItem), + ) def add_broker(self, name, address, port=None, networks=None): """Add a broker to the brokers table. @@ -390,8 +407,11 @@ def add_broker(self, name, address, port=None, networks=None): params['port'] = port if networks: params['networks'] = networks - response = self.api.post('/brokers', data=params) - return RabbitMQBrokerItem(response) + return self.api.post( + '/brokers', + data=params, + wrapper=RabbitMQBrokerItem, + ) def remove_broker(self, name): """Remove a broker from the brokers table. @@ -404,7 +424,7 @@ def remove_broker(self, name): :return: The broker that was deleted. """ - self.api.delete('/brokers/{0}'.format(name)) + return self.api.delete('/brokers/{0}'.format(name)) def update_broker(self, name, networks): """Update a broker. @@ -417,16 +437,18 @@ def update_broker(self, name, networks): :return: The updated broker. """ - response = self.api.put('/brokers/{0}'.format(name), data={ - 'networks': networks, - }) - return RabbitMQBrokerItem(response) + return self.api.put( + '/brokers/{0}'.format(name), + data={ + 'networks': networks, + }, + wrapper=RabbitMQBrokerItem, + ) def get_brokers(self): - response = self.api.get('/brokers',) - return ListResponse( - [RabbitMQBrokerItem(item) for item in response['items']], - response['metadata'] + return self.api.get( + '/brokers', + wrapper=ListResponse.of(RabbitMQBrokerItem), ) def update_db_nodes(self): @@ -435,25 +457,20 @@ def update_db_nodes(self): :return: A list of DB nodes in the cluster. """ params = {'action': 'update'} - response = self.api.post('/db-nodes', data=params) - return ListResponse( - [DBNodeItem(item) for item in response['items']], - response['metadata'] + return self.api.post( + '/db-nodes', + data=params, + wrapper=ListResponse.of(DBNodeItem), ) def get_db_nodes(self): - response = self.api.get('/db-nodes') - return ListResponse( - [DBNodeItem(item) for item in response['items']], - response['metadata'] - ) + return self.api.get('/db-nodes', wrapper=ListResponse.of(DBNodeItem)) def get_version(self): """ :return: Cloudify's management machine version information. """ - response = self.api.get('/version', versioned_url=False) - return response + return self.api.get('/version', versioned_url=False) def get_context(self, _include=None): """ @@ -464,8 +481,7 @@ def get_context(self, _include=None): :param _include: List of fields to include in response. :return: Context stored in manager. """ - response = self.api.get('/provider/context', _include=_include) - return response + return self.api.get('/provider/context', _include=_include) def create_context(self, name, context): """ @@ -478,11 +494,11 @@ def create_context(self, name, context): :param context: Context as dict. :return: Create context result. """ - data = {'name': name, 'context': context} - response = self.api.post('/provider/context', - data, - expected_status_code=201) - return response + return self.api.post( + '/provider/context', + data={'name': name, 'context': context}, + expected_status_code=201, + ) def update_context(self, name, context): @@ -497,9 +513,9 @@ def update_context(self, name, context): :param context: Context as dict. """ - - data = {'name': name, 'context': context} - response = self.api.post('/provider/context', data, - expected_status_code=200, - params={'update': 'true'}) - return response + return self.api.post( + '/provider/context', + data={'name': name, 'context': context}, + expected_status_code=200, + params={'update': 'true'}, + ) diff --git a/cloudify_rest_client/node_instances.py b/cloudify_rest_client/node_instances.py index 5c4875cac..a9340657c 100644 --- a/cloudify_rest_client/node_instances.py +++ b/cloudify_rest_client/node_instances.py @@ -134,10 +134,8 @@ def has_configuration_drift(self): class NodeInstancesClient(object): - def __init__(self, api): self.api = api - self._wrapper_cls = NodeInstance self._uri_prefix = 'node-instances' def create_many(self, deployment_id, node_instances): @@ -149,7 +147,7 @@ def create_many(self, deployment_id, node_instances): keys: id, node_id. :return: None """ - self.api.post( + return self.api.post( '/{self._uri_prefix}'.format(self=self), data={ 'deployment_id': deployment_id, @@ -168,10 +166,12 @@ def get(self, node_instance_id, _include=None, evaluate_functions=False): :return: The retrieved node instance. """ assert node_instance_id - uri = '/{self._uri_prefix}/{id}'.format(self=self, id=node_instance_id) - params = {'_evaluate_functions': evaluate_functions} - response = self.api.get(uri, params=params, _include=_include) - return self._wrapper_cls(response) + return self.api.get( + '/{self._uri_prefix}/{id}'.format(self=self, id=node_instance_id), + params={'_evaluate_functions': evaluate_functions}, + _include=_include, + wrapper=NodeInstance, + ) def update(self, node_instance_id, @@ -212,8 +212,12 @@ def update(self, params = {} if force: params['force'] = True - response = self.api.patch(uri, params=params, data=data) - return NodeInstance(response) + return self.api.patch( + uri, + params=params, + data=data, + wrapper=NodeInstance, + ) def _create_filters( self, @@ -266,23 +270,22 @@ def list(self, _include=None, constraints=None, **kwargs): params = self._create_filters(**kwargs) if constraints is None: - response = self.api.get('/{self._uri_prefix}'.format(self=self), - params=params, - _include=_include) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + params=params, + _include=_include, + wrapper=ListResponse.of(NodeInstance), + ) else: if _include: params['_include'] = ','.join(_include) - response = self.api.post( + return self.api.post( '/searches/{self._uri_prefix}'.format(self=self), params=params, - data={'constraints': constraints} + data={'constraints': constraints}, + wrapper=ListResponse.of(NodeInstance), ) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) - def search(self, ids, all_tenants=False): """Search node instances by their IDs. @@ -294,17 +297,18 @@ def search(self, ids, all_tenants=False): params = {} if all_tenants: params['_all_tenants'] = True - response = self.api.post('/searches/node-instances', data={ - 'filter_rules': [{ - 'key': 'id', - 'values': ids, - 'operator': 'any_of', - 'type': 'attribute' - }] - }, params=params) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + return self.api.post( + '/searches/node-instances', + data={ + 'filter_rules': [{ + 'key': 'id', + 'values': ids, + 'operator': 'any_of', + 'type': 'attribute' + }] + }, + params=params, + wrapper=ListResponse.of(NodeInstance), ) def delete(self, instance_id): @@ -316,7 +320,7 @@ def delete(self, instance_id): :param instance_id: ID of the instance to be deleted """ - self.api.delete( + return self.api.delete( '/{self._uri_prefix}/{instance_id}' .format(self=self, instance_id=instance_id), expected_status_code=204, diff --git a/cloudify_rest_client/nodes.py b/cloudify_rest_client/nodes.py index d3ea7c8e0..7c39271da 100644 --- a/cloudify_rest_client/nodes.py +++ b/cloudify_rest_client/nodes.py @@ -173,10 +173,8 @@ def type(self): class NodesClient(object): - def __init__(self, api): self.api = api - self._wrapper_cls = Node self._uri_prefix = 'nodes' self.types = NodeTypesClient(api) @@ -203,7 +201,7 @@ def _create_filters( return params def list(self, _include=None, filter_rules=None, constraints=None, - **kwargs): + wrapper=None, **kwargs): """ Returns a list of nodes which belong to the deployment identified by the provided deployment id. @@ -227,32 +225,32 @@ def list(self, _include=None, filter_rules=None, constraints=None, 'provide either filter_rules or DSL constraints, not both') params = self._create_filters(**kwargs) + wrapper = wrapper or ListResponse.of(Node) if filter_rules is not None: if _include: params['_include'] = ','.join(_include) - response = self.api.post( + return self.api.post( '/searches/{self._uri_prefix}'.format(self=self), params=params, - data={'filter_rules': filter_rules} + data={'filter_rules': filter_rules}, + wrapper=wrapper, ) elif constraints is not None: if _include: params['_include'] = ','.join(_include) - response = self.api.post( + return self.api.post( '/searches/{self._uri_prefix}'.format(self=self), params=params, - data={'constraints': constraints} + data={'constraints': constraints}, + wrapper=wrapper, ) else: - response = self.api.get( + return self.api.get( '/{self._uri_prefix}'.format(self=self), params=params, - _include=_include + _include=_include, + wrapper=wrapper ) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) def get(self, deployment_id, node_id, _include=None, evaluate_functions=False): @@ -269,14 +267,19 @@ def get(self, deployment_id, node_id, _include=None, """ assert deployment_id assert node_id - result = self.list(deployment_id=deployment_id, - id=node_id, - _include=_include, - evaluate_functions=evaluate_functions) - if not result: - return None - else: - return result[0] + + def _get_single_node(response): + if not response.get('items'): + return None + return Node(response['items'][0]) + + return self.list( + deployment_id=deployment_id, + id=node_id, + _include=_include, + evaluate_functions=evaluate_functions, + wrapper=_get_single_node, + ) def create_many(self, deployment_id, nodes): """Create multiple nodes. @@ -286,7 +289,7 @@ def create_many(self, deployment_id, nodes): Each node dict must contain at least the keys: id, type. :return: None """ - self.api.post( + return self.api.post( '/{self._uri_prefix}'.format(self=self), data={ 'deployment_id': deployment_id, @@ -306,7 +309,7 @@ def update(self, deployment_id, node_id, **kwargs): :param node_id: The node id within the given deployment :param kwargs: The new node attributes """ - self.api.patch( + return self.api.patch( '/{self._uri_prefix}/{deployment_id}/{node_id}' .format(self=self, deployment_id=deployment_id, node_id=node_id), data=kwargs, @@ -319,7 +322,7 @@ def delete(self, deployment_id, node_id): :param deployment_id: The deployment the node belongs to :param node_id: The node id within the given deployment """ - self.api.delete( + return self.api.delete( '/{self._uri_prefix}/{deployment_id}/{node_id}' .format(self=self, deployment_id=deployment_id, node_id=node_id), expected_status_code=204, @@ -353,9 +356,8 @@ def list(self, node_type=None, constraints=None, **kwargs): if constraints is None: constraints = dict() - response = self.api.post('/searches/node-types', params=params, - data={'constraints': constraints}) - return ListResponse( - items=[NodeTypes(item) for item in response['items']], - metadata=response['metadata'] + return self.api.post( + '/searches/node-types', params=params, + data={'constraints': constraints}, + wrapper=ListResponse.of(NodeTypes), ) diff --git a/cloudify_rest_client/operations.py b/cloudify_rest_client/operations.py index dd9661642..6da2c891d 100644 --- a/cloudify_rest_client/operations.py +++ b/cloudify_rest_client/operations.py @@ -50,7 +50,6 @@ class OperationsClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'operations' - self._wrapper_cls = Operation def list( self, @@ -90,16 +89,18 @@ def list( params['_offset'] = _offset if _size is not None: params['_size'] = _size - response = self.api.get('/{self._uri_prefix}'.format(self=self), - params=params, _include=_include) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + params=params, + _include=_include, + wrapper=ListResponse.of(Operation), + ) def get(self, operation_id): - response = self.api.get('/{self._uri_prefix}/{id}' - .format(self=self, id=operation_id)) - return Operation(response) + return self.api.get( + '/{self._uri_prefix}/{id}'.format(self=self, id=operation_id), + wrapper=Operation, + ) def create(self, operation_id, graph_id, name, type, parameters, dependencies): @@ -111,14 +112,18 @@ def create(self, operation_id, graph_id, name, type, parameters, 'parameters': parameters } uri = '/operations/{0}'.format(operation_id) - response = self.api.put(uri, data=data, expected_status_code=201) - return Operation(response) + return self.api.put( + uri, + data=data, + expected_status_code=201, + wrapper=Operation, + ) def update(self, operation_id, state, result=None, exception=None, exception_causes=None, manager_name=None, agent_name=None): uri = '/operations/{0}'.format(operation_id) - self.api.patch(uri, data={ + return self.api.patch(uri, data={ 'state': state, 'result': result, 'exception': exception, @@ -144,7 +149,7 @@ def _update_operation_inputs(self, deployment_id=None, node_id=None, :param rel_index: when updating relationship operations, look at the relationship at this index """ - self.api.post('/operations', data={ + return self.api.post('/operations', data={ 'action': 'update-stored', 'deployment_id': deployment_id, 'node_id': node_id, @@ -154,8 +159,7 @@ def _update_operation_inputs(self, deployment_id=None, node_id=None, }, expected_status_code=(200, 204)) def delete(self, operation_id): - uri = '/operations/{0}'.format(operation_id) - self.api.delete(uri) + return self.api.delete('/operations/{0}'.format(operation_id)) class TasksGraph(dict): @@ -179,17 +183,17 @@ class TasksGraphClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'tasks_graphs' - self._wrapper_cls = TasksGraph def list(self, execution_id, name=None, _include=None): params = {'execution_id': execution_id} if name: params['name'] = name - response = self.api.get('/{self._uri_prefix}'.format(self=self), - params=params, _include=_include) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + params=params, + _include=_include, + wrapper=ListResponse.of(TasksGraph), + ) def create(self, execution_id, name, operations=None, created_at=None, graph_id=None): @@ -203,10 +207,17 @@ def create(self, execution_id, name, operations=None, created_at=None, if graph_id: params['graph_id'] = graph_id uri = '/{self._uri_prefix}/tasks_graphs'.format(self=self) - response = self.api.post(uri, data=params, expected_status_code=201) - return TasksGraph(response) + return self.api.post( + uri, + data=params, + expected_status_code=201, + wrapper=TasksGraph, + ) def update(self, tasks_graph_id, state): uri = '/tasks_graphs/{0}'.format(tasks_graph_id) - response = self.api.patch(uri, data={'state': state}) - return TasksGraph(response) + return self.api.patch( + uri, + data={'state': state}, + wrapper=TasksGraph, + ) diff --git a/cloudify_rest_client/permissions.py b/cloudify_rest_client/permissions.py index 4bedd34b6..b6021e5e2 100644 --- a/cloudify_rest_client/permissions.py +++ b/cloudify_rest_client/permissions.py @@ -10,7 +10,6 @@ def __init__(self, permission): class PermissionsClient(object): def __init__(self, api): self.api = api - self._wrapper_cls = Permission self._uri_prefix = '/permissions' def list(self, role=None): @@ -22,11 +21,7 @@ def list(self, role=None): url = self._uri_prefix if role: url = '{0}/{1}'.format(url, role) - response = self.api.get(url) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] - ) + return self.api.get(url, wrapper=ListResponse.of(Permission)) def add(self, permission, role): """Allow role the specified permission @@ -34,7 +29,9 @@ def add(self, permission, role): :param permission: the permission name to allow :param role: the role name """ - self.api.put('{0}/{1}/{2}'.format(self._uri_prefix, role, permission)) + return self.api.put( + '{0}/{1}/{2}'.format(self._uri_prefix, role, permission), + ) def delete(self, permission, role): """Disallow role the specified permission @@ -42,5 +39,5 @@ def delete(self, permission, role): :param permission: the permission name to disallow :param role: the role name """ - self.api.delete( + return self.api.delete( '{0}/{1}/{2}'.format(self._uri_prefix, role, permission)) diff --git a/cloudify_rest_client/plugins.py b/cloudify_rest_client/plugins.py index 3f5042e3c..34184f3bf 100644 --- a/cloudify_rest_client/plugins.py +++ b/cloudify_rest_client/plugins.py @@ -208,7 +208,6 @@ class PluginsClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'plugins' - self._wrapper_cls = Plugin def get(self, plugin_id, _include=None, **kwargs): """ @@ -219,14 +218,11 @@ def get(self, plugin_id, _include=None, **kwargs): :return: The plugin details. """ assert plugin_id - uri = '/{self._uri_prefix}/{id}'.format(self=self, id=plugin_id) - response = self.api.get(uri, _include=_include, params=kwargs) - return self._wrapper_cls(response) - - def _wrap_list(self, response): - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + return self.api.get( + '/{self._uri_prefix}/{id}'.format(self=self, id=plugin_id), + _include=_include, + params=kwargs, + wrapper=Plugin, ) def list(self, _include=None, sort=None, is_descending=False, **kwargs): @@ -243,10 +239,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - _include=_include, - params=params) - return self._wrap_list(response) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + _include=_include, + params=params, + wrapper=ListResponse.of(Plugin), + ) def delete(self, plugin_id, force=False): """ @@ -260,7 +258,7 @@ def delete(self, plugin_id, force=False): data = { 'force': force } - self.api.delete('/plugins/{0}'.format(plugin_id), data=data) + return self.api.delete('/plugins/{0}'.format(plugin_id), data=data) def upload(self, plugin_path, @@ -307,18 +305,21 @@ def upload(self, progress_callback=progress_callback, client=self.api) - response = self.api.post( + return self.api.post( '/{self._uri_prefix}'.format(self=self), params=query_params, data=data, timeout=timeout, - expected_status_code=201 + expected_status_code=201, + wrapper=self._wrap_plugins_list ) + + def _wrap_plugins_list(self, response): if 'metadata' in response and 'items' in response: # This is a list of plugins - for caravan - return self._wrap_list(response) + return ListResponse.of(Plugin)(response) else: - return self._wrapper_cls(response) + return Plugin(response) def download(self, plugin_id, output_file, progress_callback=None): """Downloads a previously uploaded plugin archive from the manager @@ -330,6 +331,7 @@ def download(self, plugin_id, output_file, progress_callback=None): :return: The file path of the downloaded plugin. """ uri = '/plugins/{0}/archive'.format(plugin_id) + # TODO this is not async-friendly with contextlib.closing(self.api.get(uri, stream=True)) as response: output_file = bytes_stream_utils.write_response_stream_to_file( response, output_file, progress_callback=progress_callback) @@ -346,6 +348,7 @@ def download_yaml(self, plugin_id, output_file, progress_callback=None): :return: The file path of the downloaded plugin yaml. """ uri = '/plugins/{0}/yaml'.format(plugin_id) + # TODO this is not async-friendly with contextlib.closing(self.api.get(uri, stream=True)) as response: output_file = bytes_stream_utils.write_response_stream_to_file( response, output_file, progress_callback=progress_callback) @@ -363,6 +366,7 @@ def get_yaml(self, plugin_id, dsl_version=None, progress_callback=None): """ params = {'dsl_version': dsl_version} if dsl_version else {} uri = '/plugins/{0}/yaml'.format(plugin_id) + # TODO this is not async-friendly with tempfile.TemporaryDirectory() as tmpdir: output_file = os.path.join(tmpdir, 'plugin.yaml') response = self.api.get(uri, params=params, stream=True) @@ -404,7 +408,7 @@ def set_global(self, plugin_id): data = {'visibility': VisibilityState.GLOBAL} return self.api.patch( '/plugins/{0}/set-visibility'.format(plugin_id), - data=data + data=data, ) def set_visibility(self, plugin_id, visibility): @@ -419,7 +423,7 @@ def set_visibility(self, plugin_id, visibility): data = {'visibility': visibility} return self.api.patch( '/plugins/{0}/set-visibility'.format(plugin_id), - data=data + data=data, ) def install(self, plugin_id, managers=None, agents=None): @@ -439,8 +443,11 @@ def install(self, plugin_id, managers=None, agents=None): data['managers'] = managers if agents: data['agents'] = agents - response = self.api.post('/plugins/{0}'.format(plugin_id), data=data) - return Plugin(response) + return self.api.post( + '/plugins/{0}'.format(plugin_id), + data=data, + wrapper=Plugin, + ) def set_state(self, plugin_id, state, agent_name=None, manager_name=None, error=None): @@ -460,16 +467,23 @@ def set_state(self, plugin_id, state, agent_name=None, data['manager'] = manager_name if error: data['error'] = error - response = self.api.put('/plugins/{0}'.format(plugin_id), data=data) - return Plugin(response) + return self.api.put( + '/plugins/{0}'.format(plugin_id), + data=data, + wrapper=Plugin, + ) def set_owner(self, plugin_id, creator): """Change ownership of the plugin.""" - response = self.api.patch('/plugins/{0}'.format(plugin_id), - data={'creator': creator}) - return Plugin(response) + return self.api.patch( + '/plugins/{0}'.format(plugin_id), + data={'creator': creator}, + wrapper=Plugin, + ) def update(self, plugin_id, **kwargs): - response = self.api.patch('/plugins/{0}'.format(plugin_id), - data=kwargs) - return Plugin(response) + return self.api.patch( + '/plugins/{0}'.format(plugin_id), + data=kwargs, + wrapper=Plugin, + ) diff --git a/cloudify_rest_client/plugins_update.py b/cloudify_rest_client/plugins_update.py index fa280373b..4f6871c27 100644 --- a/cloudify_rest_client/plugins_update.py +++ b/cloudify_rest_client/plugins_update.py @@ -57,7 +57,6 @@ class PluginsUpdateClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'plugins-updates' - self._wrapper_cls = PluginsUpdate def get(self, plugins_update_id, _include=None, **kwargs): """ @@ -70,13 +69,11 @@ def get(self, plugins_update_id, _include=None, **kwargs): assert plugins_update_id uri = '/{self._uri_prefix}/{id}'.format( self=self, id=plugins_update_id) - response = self.api.get(uri, _include=_include, params=kwargs) - return self._wrapper_cls(response) - - def _wrap_list(self, response): - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + return self.api.get( + uri, + _include=_include, + params=kwargs, + wrapper=PluginsUpdate, ) def list(self, _include=None, sort=None, is_descending=False, **kwargs): @@ -94,10 +91,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - _include=_include, - params=params) - return self._wrap_list(response) + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + _include=_include, + params=params, + wrapper=ListResponse.of(PluginsUpdate), + ) def inject(self, blueprint_id, force=False, created_by=None, created_at=None, @@ -105,7 +104,7 @@ def inject(self, blueprint_id, force=False, update_id=None, affected_deployments=None, deployments_per_tenants=None, all_tenants=None, temp_blueprint_id=None): - return PluginsUpdate(self.api.post( + return self.api.post( '/{self._uri_prefix}/{}/update/initiate'.format(blueprint_id, self=self), data=_data_from_kwargs( @@ -120,7 +119,8 @@ def inject(self, blueprint_id, force=False, deployments_per_tenants=deployments_per_tenants, temp_blueprint_id=temp_blueprint_id, ), - )) + wrapper=PluginsUpdate, + ) def update_plugins(self, blueprint_id, force=False, plugin_names=None, to_latest=None, all_to_latest=True, @@ -162,7 +162,7 @@ def update_plugins(self, blueprint_id, force=False, plugin_names=None, RuntimeWarning) else: mapping = {} - response = self.api.post( + return self.api.post( '/{self._uri_prefix}/{}/update/initiate'.format(blueprint_id, self=self), data=_data_from_kwargs( @@ -176,9 +176,9 @@ def update_plugins(self, blueprint_id, force=False, plugin_names=None, auto_correct_types=auto_correct_types, reevaluate_active_statuses=reevaluate_active_statuses, all_tenants=all_tenants - ) + ), + wrapper=PluginsUpdate, ) - return PluginsUpdate(response) def finalize_plugins_update(self, plugins_update_id): """ @@ -186,11 +186,13 @@ def finalize_plugins_update(self, plugins_update_id): :return: a PluginUpdate object. """ - response = self.api.post( - '/{self._uri_prefix}/{}/update/finalize'.format(plugins_update_id, - self=self) + return self.api.post( + '/{self._uri_prefix}/{}/update/finalize'.format( + plugins_update_id, + self=self, + ), + wrapper=PluginsUpdate, ) - return PluginsUpdate(response) def _data_from_kwargs(**kwargs): diff --git a/cloudify_rest_client/responses.py b/cloudify_rest_client/responses.py index ea7957181..aca9d999a 100644 --- a/cloudify_rest_client/responses.py +++ b/cloudify_rest_client/responses.py @@ -58,6 +58,12 @@ def total(self): class ListResponse(object): + @classmethod + def of(cls, item_cls): + return lambda response_json: cls( + items=[item_cls(item) for item in response_json.get('items', [])], + metadata=response_json.get('metadata', {}), + ) def __init__(self, items, metadata): self.items = items diff --git a/cloudify_rest_client/secrets.py b/cloudify_rest_client/secrets.py index ba965452e..25c43e6f9 100644 --- a/cloudify_rest_client/secrets.py +++ b/cloudify_rest_client/secrets.py @@ -77,7 +77,6 @@ def provider_name(self): class SecretsClient(object): - def __init__(self, api): self.api = api @@ -125,8 +124,11 @@ def create(self, if provider: data['provider'] = provider - response = self.api.put('/secrets/{0}'.format(key), data=data) - return Secret(response) + return self.api.put( + '/secrets/{0}'.format(key), + data=data, + wrapper=Secret, + ) def update( self, @@ -144,12 +146,14 @@ def update( 'provider': provider, }) data = dict((k, v) for k, v in kwargs.items() if v is not None) - response = self.api.patch('/secrets/{0}'.format(key), data=data) - return Secret(response) + return self.api.patch( + '/secrets/{0}'.format(key), + data=data, + wrapper=Secret, + ) def get(self, key): - response = self.api.get('/secrets/{0}'.format(key)) - return Secret(response) + return self.api.get('/secrets/{0}'.format(key), wrapper=Secret) def export(self, _include=None, **kwargs): """ @@ -159,9 +163,11 @@ def export(self, _include=None, **kwargs): :return: Secrets' list """ params = kwargs - response = self.api.get('/secrets/share/export', params=params, - _include=_include) - return response + return self.api.get( + '/secrets/share/export', + params=params, + _include=_include, + ) def import_secrets(self, secrets_list, tenant_map=None, passphrase=None, override_collisions=False): @@ -183,8 +189,10 @@ def import_secrets(self, secrets_list, tenant_map=None, 'override_collisions': override_collisions } data = dict((k, v) for k, v in data.items() if v is not None) - response = self.api.post('/secrets/share/import', data=data) - return response + return self.api.post( + '/secrets/share/import', + data=data, + ) def list(self, sort=None, is_descending=False, filter_rules=None, constraints=None, **kwargs): @@ -211,19 +219,28 @@ def list(self, sort=None, is_descending=False, params['_sort'] = '-' + sort if is_descending else sort if filter_rules: - response = self.api.post('/searches/secrets', params=params, - data={'filter_rules': filter_rules}) + return self.api.post( + '/searches/secrets', + params=params, + data={'filter_rules': filter_rules}, + wrapper=ListResponse.of(Secret), + ) elif constraints: - response = self.api.post('/searches/secrets', params=params, - data={'constraints': constraints}) + return self.api.post( + '/searches/secrets', + params=params, + data={'constraints': constraints}, + wrapper=ListResponse.of(Secret), + ) else: - response = self.api.get('/secrets', params=params) - - return ListResponse([Secret(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/secrets', + params=params, + wrapper=ListResponse.of(Secret), + ) def delete(self, key): - self.api.delete('/secrets/{0}'.format(key)) + return self.api.delete('/secrets/{0}'.format(key)) def set_global(self, key): """ diff --git a/cloudify_rest_client/secrets_providers.py b/cloudify_rest_client/secrets_providers.py index a3f0c2dca..8e45b9de4 100644 --- a/cloudify_rest_client/secrets_providers.py +++ b/cloudify_rest_client/secrets_providers.py @@ -69,9 +69,10 @@ def __init__(self, api): self.api = api def get(self, name): - response = self.api.get(f'/secrets-providers/{name}') - - return SecretsProvider(response) + return self.api.get( + f'/secrets-providers/{name}', + wrapper=SecretsProvider, + ) def create( self, @@ -104,9 +105,11 @@ def create( if connection_parameters: data['connection_parameters'] = connection_parameters - response = self.api.put('/secrets-providers', data=data) - - return SecretsProvider(response) + return self.api.put( + '/secrets-providers', + data=data, + wrapper=SecretsProvider, + ) def update( self, @@ -138,15 +141,17 @@ def update( } data = dict((k, v) for k, v in data.items() if v is not None) - response = self.api.patch(f'/secrets-providers/{name}', data=data) - - return SecretsProvider(response) + return self.api.patch( + f'/secrets-providers/{name}', + data=data, + wrapper=SecretsProvider, + ) def delete(self, name): """ Delete a Secrets Provider. """ - self.api.delete(f'/secrets-providers/{name}') + return self.api.delete(f'/secrets-providers/{name}') def list(self): """ @@ -154,11 +159,9 @@ def list(self): :return: Secrets Parameters list. """ - response = self.api.get('/secrets-providers') - - return ListResponse( - [SecretsProvider(item) for item in response['items']], - response['metadata'], + return self.api.get( + '/secrets-providers', + wrapper=ListResponse.of(SecretsProvider), ) def check( @@ -190,6 +193,4 @@ def check( if connection_parameters: data['connection_parameters'] = connection_parameters - response = self.api.put('/secrets-providers', data=data) - - return response + return self.api.put('/secrets-providers', data=data) diff --git a/cloudify_rest_client/sites.py b/cloudify_rest_client/sites.py index e3edc8985..580592043 100644 --- a/cloudify_rest_client/sites.py +++ b/cloudify_rest_client/sites.py @@ -57,7 +57,6 @@ class SitesClient(object): def __init__(self, api): self.api = api self._uri_prefix = 'sites' - self._wrapper_cls = Site def create(self, name, location=None, visibility=VisibilityState.TENANT, created_by=None, created_at=None): @@ -79,11 +78,11 @@ def create(self, name, location=None, visibility=VisibilityState.TENANT, data['created_by'] = created_by if created_at: data['created_at'] = created_at - response = self.api.put( + return self.api.put( '/{self._uri_prefix}/{name}'.format(self=self, name=name), - data=data + data=data, + wrapper=Site, ) - return self._wrapper_cls(response) def update(self, name, location=None, visibility=VisibilityState.TENANT, new_name=None): @@ -104,11 +103,11 @@ def update(self, name, location=None, visibility=VisibilityState.TENANT, } # Remove the keys with value None data = dict((k, v) for k, v in data.items() if v is not None) - response = self.api.post( + return self.api.post( '/{self._uri_prefix}/{name}'.format(self=self, name=name), - data=data + data=data, + wrapper=Site, ) - return self._wrapper_cls(response) def get(self, name): """ @@ -117,10 +116,10 @@ def get(self, name): :param name: The name of the site :return: The details of the site """ - response = self.api.get( - '/{self._uri_prefix}/{name}'.format(self=self, name=name) + return self.api.get( + '/{self._uri_prefix}/{name}'.format(self=self, name=name), + wrapper=Site, ) - return self._wrapper_cls(response) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """ @@ -137,12 +136,11 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: kwargs['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/{self._uri_prefix}'.format(self=self), - _include=_include, - params=kwargs) - return ListResponse( - [self._wrapper_cls(item) for item in response['items']], - response['metadata'] + return self.api.get( + '/{self._uri_prefix}'.format(self=self), + _include=_include, + params=kwargs, + wrapper=ListResponse.of(Site), ) def delete(self, name): @@ -152,6 +150,6 @@ def delete(self, name): :param name: The name of the site to be deleted. :return: Deleted site. """ - self.api.delete( + return self.api.delete( '/{self._uri_prefix}/{name}'.format(self=self, name=name) ) diff --git a/cloudify_rest_client/snapshots.py b/cloudify_rest_client/snapshots.py index 602a2b3e7..8b0685c23 100644 --- a/cloudify_rest_client/snapshots.py +++ b/cloudify_rest_client/snapshots.py @@ -85,8 +85,7 @@ def get(self, snapshot_id, _include=None): """ assert snapshot_id uri = '/snapshots/{0}'.format(snapshot_id) - response = self.api.get(uri, _include=_include) - return Snapshot(response) + return self.api.get(uri, _include=_include, wrapper=Snapshot) def list(self, _include=None, sort=None, is_descending=False, **kwargs): """ @@ -103,9 +102,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/snapshots', params=params, _include=_include) - return ListResponse([Snapshot(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/snapshots', + params=params, + _include=_include, + wrapper=ListResponse.of(Snapshot), + ) def create(self, snapshot_id, @@ -131,8 +133,12 @@ def create(self, 'queue': queue, 'tempdir_path': tempdir_path, } - response = self.api.put(uri, data=params, expected_status_code=201) - return Execution(response) + return self.api.put( + uri, + data=params, + expected_status_code=201, + wrapper=Execution, + ) def delete(self, snapshot_id): """ @@ -142,7 +148,7 @@ def delete(self, snapshot_id): :return: Deleted snapshot. """ assert snapshot_id - self.api.delete('/snapshots/{0}'.format(snapshot_id)) + return self.api.delete('/snapshots/{0}'.format(snapshot_id)) def restore(self, snapshot_id, @@ -171,8 +177,7 @@ def restore(self, 'ignore_plugin_failure': ignore_plugin_failure } - response = self.api.post(uri, data=params) - return Execution(response) + return self.api.post(uri, data=params, wrapper=Execution) def upload(self, snapshot_path, @@ -206,9 +211,13 @@ def upload(self, progress_callback=progress_callback, client=self.api) - response = self.api.put(uri, params=query_params, data=data, - expected_status_code=201) - return Snapshot(response) + return self.api.put( + uri, + params=query_params, + data=data, + expected_status_code=201, + wrapper=Snapshot, + ) def download(self, snapshot_id, output_file, progress_callback=None): """ @@ -223,6 +232,7 @@ def download(self, snapshot_id, output_file, progress_callback=None): """ uri = '/snapshots/{0}/archive'.format(snapshot_id) + # TODO this is not async-friendly with contextlib.closing(self.api.get(uri, stream=True)) as response: output_file = bytes_stream_utils.write_response_stream_to_file( response, output_file, progress_callback=progress_callback) @@ -241,7 +251,7 @@ def update_status(self, snapshot_id, status, error=None): params = {'status': status} if error: params['error'] = error - self.api.patch(uri, data=params) + return self.api.patch(uri, data=params) def get_status(self): """ diff --git a/cloudify_rest_client/summary.py b/cloudify_rest_client/summary.py index faaeeea77..88ea7424a 100644 --- a/cloudify_rest_client/summary.py +++ b/cloudify_rest_client/summary.py @@ -27,11 +27,11 @@ def get(self, _target_field, _sub_field=None, **kwargs): '_sub_field': _sub_field, } params.update(kwargs) - response = self.api.get( + return self.api.get( '/summary/{summary_type}'.format(summary_type=self.summary_type), params=params, + wrapper=ListResponse.of(dict), ) - return ListResponse(response['items'], response['metadata']) class SummariesClient(object): diff --git a/cloudify_rest_client/tenants.py b/cloudify_rest_client/tenants.py index 3f0893428..4f145df8d 100644 --- a/cloudify_rest_client/tenants.py +++ b/cloudify_rest_client/tenants.py @@ -103,19 +103,20 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/tenants', - _include=_include, - params=params) - return ListResponse([Tenant(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/tenants', + _include=_include, + params=params, + wrapper=ListResponse.of(Tenant), + ) def create(self, tenant_name, rabbitmq_password=''): - response = self.api.post( + return self.api.post( '/tenants/{0}'.format(tenant_name), expected_status_code=201, data={'rabbitmq_password': rabbitmq_password}, + wrapper=Tenant, ) - return Tenant(response) def add_user(self, username, tenant_name, role): """Add user to a tenant. @@ -130,8 +131,7 @@ def add_user(self, username, tenant_name, role): 'tenant_name': tenant_name, 'role': role, } - response = self.api.put('/tenants/users', data=data) - return Tenant(response) + return self.api.put('/tenants/users', data=data, wrapper=Tenant) def update_user(self, username, tenant_name, role): """Update user in a tenant. @@ -149,8 +149,7 @@ def update_user(self, username, tenant_name, role): 'tenant_name': tenant_name, 'role': role, } - response = self.api.patch('/tenants/users', data=data) - return Tenant(response) + return self.api.patch('/tenants/users', data=data, wrapper=Tenant) def remove_user(self, username, tenant_name): data = {'username': username, 'tenant_name': tenant_name} @@ -169,8 +168,7 @@ def add_user_group(self, group_name, tenant_name, role): 'tenant_name': tenant_name, 'role': role, } - response = self.api.put('/tenants/user-groups', data=data) - return Tenant(response) + return self.api.put('/tenants/user-groups', data=data, wrapper=Tenant) def update_user_group(self, group_name, tenant_name, role): """Update user group in a tenant. @@ -188,8 +186,11 @@ def update_user_group(self, group_name, tenant_name, role): 'tenant_name': tenant_name, 'role': role, } - response = self.api.patch('/tenants/user-groups', data=data) - return Tenant(response) + return self.api.patch( + '/tenants/user-groups', + data=data, + wrapper=Tenant, + ) def remove_user_group(self, group_name, tenant_name): """Remove user group from tenant. @@ -201,14 +202,14 @@ def remove_user_group(self, group_name, tenant_name): """ data = {'group_name': group_name, 'tenant_name': tenant_name} - self.api.delete('/tenants/user-groups', data=data) + return self.api.delete('/tenants/user-groups', data=data) def get(self, tenant_name, **kwargs): - response = self.api.get( + return self.api.get( '/tenants/{0}'.format(tenant_name), - params=kwargs + params=kwargs, + wrapper=Tenant, ) - return Tenant(response) def delete(self, tenant_name): - self.api.delete('/tenants/{0}'.format(tenant_name)) + return self.api.delete('/tenants/{0}'.format(tenant_name)) diff --git a/cloudify_rest_client/tests/__init__.py b/cloudify_rest_client/tests/__init__.py index c03c6a1f3..b17a3a525 100644 --- a/cloudify_rest_client/tests/__init__.py +++ b/cloudify_rest_client/tests/__init__.py @@ -7,7 +7,14 @@ class MockHTTPClient(CloudifyClient.client_class): def __init__(self, *args, **kwargs): super(MockHTTPClient, self).__init__(*args, **kwargs) - self._do_request = mock.Mock() + self.do_request = mock.Mock(side_effect=self._fake_do_request) + + def _fake_do_request(self, *args, **kwargs): + data = self.do_request.return_value or {} + wrapper = kwargs.get('wrapper') + if wrapper: + return wrapper(data) + return data class MockClient(CloudifyClient): @@ -24,7 +31,7 @@ def __init__(self, **kwargs): @property def mock_do_request(self): - return self._client._do_request + return self._client.do_request def assert_last_mock_call(self, endpoint, data=None, params=None, expected_status_code=200, stream=False, @@ -32,17 +39,16 @@ def assert_last_mock_call(self, endpoint, data=None, params=None, if not params: params = {} - _, kwargs = self.mock_do_request.call_args_list[-1] + args, kwargs = self.mock_do_request.call_args_list[-1] - called_endpoint = kwargs['request_url'].rpartition('v3.1')[2] + method, called_endpoint = args assert endpoint == called_endpoint - assert data == kwargs['body'] + assert data == kwargs['data'] assert params == kwargs['params'] assert expected_status_code == kwargs['expected_status_code'] assert stream == kwargs['stream'] - - assert expected_method == kwargs['requests_method'].__name__ + assert expected_method == method.lower() @property def last_mock_call_headers(self): diff --git a/cloudify_rest_client/tests/test_tokens.py b/cloudify_rest_client/tests/test_tokens.py index c9b0beb82..1ad14d7c7 100644 --- a/cloudify_rest_client/tests/test_tokens.py +++ b/cloudify_rest_client/tests/test_tokens.py @@ -20,7 +20,7 @@ def test_token_create(): result = client.tokens.create() client.assert_last_mock_call(endpoint='/tokens', - data='{}', + data={}, expected_method='post') assert isinstance(result, Token) diff --git a/cloudify_rest_client/tokens.py b/cloudify_rest_client/tokens.py index 3be5c1148..6a80da13f 100644 --- a/cloudify_rest_client/tokens.py +++ b/cloudify_rest_client/tokens.py @@ -58,11 +58,10 @@ def list(self, **kwargs): :param kwargs: Optional fields or filter arguments as defined in the restservice. """ - response = self.api.get('/tokens', params=kwargs) - - return ListResponse( - [Token(item) for item in response['items']], - response['metadata'] + return self.api.get( + '/tokens', + params=kwargs, + wrapper=ListResponse.of(Token), ) def get(self, token_id): @@ -71,11 +70,11 @@ def get(self, token_id): :return: Token """ - return Token(self.api.get('/tokens/{}'.format(token_id))) + return self.api.get('/tokens/{}'.format(token_id), wrapper=Token) def delete(self, token_id): """Delete an existing token, revoking its access.""" - self.api.delete('/tokens/{}'.format(token_id)) + return self.api.delete('/tokens/{}'.format(token_id)) def create(self, description=None, expiration=None): """Create a new authentication token for the current user. @@ -94,7 +93,8 @@ def create(self, description=None, expiration=None): if expiration: parse_utc_datetime(expiration) data['expiration_date'] = expiration - return Token(self.api.post( + return self.api.post( '/tokens', data=data, - )) + wrapper=Token, + ) diff --git a/cloudify_rest_client/user_groups.py b/cloudify_rest_client/user_groups.py index e70780fe4..ae3beff73 100644 --- a/cloudify_rest_client/user_groups.py +++ b/cloudify_rest_client/user_groups.py @@ -78,11 +78,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/user-groups', - _include=_include, - params=params) - return ListResponse([Group(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/user-groups', + _include=_include, + params=params, + wrapper=ListResponse.of(Group), + ) def create(self, group_name, role, ldap_group_dn=None): data = { @@ -90,32 +91,39 @@ def create(self, group_name, role, ldap_group_dn=None): 'ldap_group_dn': ldap_group_dn, 'role': role } - response = self.api.post('/user-groups', - data=data, - expected_status_code=201) - return Group(response) + return self.api.post( + '/user-groups', + data=data, + expected_status_code=201, + wrapper=Group, + ) def get(self, group_name, **kwargs): - response = self.api.get( + return self.api.get( '/user-groups/{0}'.format(group_name), - params=kwargs + params=kwargs, + wrapper=Group, ) - return Group(response) def delete(self, group_name): - self.api.delete('/user-groups/{0}'.format(group_name)) + return self.api.delete('/user-groups/{0}'.format(group_name)) def set_role(self, group_name, new_role): data = {'role': new_role} - response = self.api.post('/user-groups/{0}'.format(group_name), - data=data) - return Group(response) + return self.api.post( + '/user-groups/{0}'.format(group_name), + data=data, + wrapper=Group, + ) def add_user(self, username, group_name): data = {'username': username, 'group_name': group_name} - response = self.api.put('/user-groups/users', data=data) - return Group(response) + return self.api.put( + '/user-groups/users', + data=data, + wrapper=Group, + ) def remove_user(self, username, group_name): data = {'username': username, 'group_name': group_name} - self.api.delete('/user-groups/users', data=data) + return self.api.delete('/user-groups/users', data=data) diff --git a/cloudify_rest_client/users.py b/cloudify_rest_client/users.py index fed8067f3..66988741a 100644 --- a/cloudify_rest_client/users.py +++ b/cloudify_rest_client/users.py @@ -113,11 +113,12 @@ def list(self, _include=None, sort=None, is_descending=False, **kwargs): if sort: params['_sort'] = '-' + sort if is_descending else sort - response = self.api.get('/users', - _include=_include, - params=params) - return ListResponse([User(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/users', + _include=_include, + params=params, + wrapper=ListResponse.of(User), + ) def create(self, username, password, role, is_prehashed=None, created_at=None, first_login_at=None, last_login_at=None): @@ -130,55 +131,67 @@ def create(self, username, password, role, is_prehashed=None, data['first_login_at'] = first_login_at if last_login_at: data['last_login_at'] = last_login_at - response = self.api.put('/users', data=data, expected_status_code=201) - return User(response) + return self.api.put( + '/users', + data=data, + expected_status_code=201, + wrapper=User, + ) def set_password(self, username, new_password): data = {'password': new_password} - response = self.api.post('/users/{0}'.format(username), data=data) - return User(response) + return self.api.post( + '/users/{0}'.format(username), + data=data, + wrapper=User, + ) def set_role(self, username, new_role): data = {'role': new_role} - response = self.api.post('/users/{0}'.format(username), data=data) - return User(response) + return self.api.post( + '/users/{0}'.format(username), + data=data, + wrapper=User, + ) def set_show_getting_started(self, username, flag_value): data = {'show_getting_started': flag_value} - response = self.api.post('/users/{0}'.format(username), data=data) - return User(response) + return self.api.post( + '/users/{0}'.format(username), + data=data, + wrapper=User, + ) def get(self, username, **kwargs): - response = self.api.get( + return self.api.get( '/users/{0}'.format(username), - params=kwargs + params=kwargs, + wrapper=User, ) - return User(response) def get_self(self, **kwargs): - response = self.api.get('/user', params=kwargs) - return User(response) + return self.api.get('/user', params=kwargs, wrapper=User) def delete(self, username): - self.api.delete('/users/{0}'.format(username)) + return self.api.delete('/users/{0}'.format(username)) def activate(self, username): - response = self.api.post( + return self.api.post( '/users/active/{0}'.format(username), - data={'action': 'activate'} + data={'action': 'activate'}, + wrapper=User, ) - return User(response) def deactivate(self, username): - response = self.api.post( + return self.api.post( '/users/active/{0}'.format(username), - data={'action': 'deactivate'} + data={'action': 'deactivate'}, + wrapper=User, ) - return User(response) def unlock(self, username, **kwargs): - response = self.api.post( + return self.api.post( '/users/unlock/{0}'.format(username), - params=kwargs + params=kwargs, + wrapper=User, ) - return User(response) diff --git a/cloudify_rest_client/workflows.py b/cloudify_rest_client/workflows.py index b7414063d..2df9ceae6 100644 --- a/cloudify_rest_client/workflows.py +++ b/cloudify_rest_client/workflows.py @@ -58,11 +58,15 @@ def list(self, filter_id=None, filter_rules=None, **kwargs): params['_filter_id'] = filter_id if filter_rules: - response = self.api.post('/searches/workflows', params=params, - data={'filter_rules': filter_rules}) + return self.api.post( + '/searches/workflows', + params=params, + data={'filter_rules': filter_rules}, + wrapper=ListResponse.of(Workflow), + ) else: - response = self.api.get('/workflows', params=params) - - return ListResponse( - [Workflow(item) for item in response['items']], - response['metadata']) + return self.api.get( + '/workflows', + params=params, + wrapper=ListResponse.of(Workflow), + )