diff --git a/.github/workflows/build_and_load_to_pypi.yaml b/.github/workflows/build_and_load_to_pypi.yaml index d68a50f..fc704a9 100644 --- a/.github/workflows/build_and_load_to_pypi.yaml +++ b/.github/workflows/build_and_load_to_pypi.yaml @@ -2,11 +2,13 @@ name: Building artifacts and load them to PYPI index on: push: branches: [ main ] + paths: + - 'VERSION.txt' jobs: Build_release: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 steps: - uses: actions/setup-python@v4 diff --git a/.github/workflows/pr_checks.yaml b/.github/workflows/pr_checks.yaml index fe28091..5f5a3ac 100644 --- a/.github/workflows/pr_checks.yaml +++ b/.github/workflows/pr_checks.yaml @@ -2,16 +2,18 @@ name: PR checks on: pull_request: branches: [ main, dev ] + paths: + - 'zephyr/**' push: branches: [ dev ] jobs: Run_PR_checks: - runs-on: ubuntu-latest + runs-on: ubuntu-20.04 strategy: matrix: - python-version: [ '3.7', '3.8', '3.9' ] + python-version: [ '3.6', '3.7', '3.8', '3.9', '3.10', '3.11' ] steps: - name: Check out repository code @@ -37,4 +39,4 @@ jobs: pipenv run pip list --local - name: Test with tox - run: pipenv run tox \ No newline at end of file + run: pipenv run tox diff --git a/Pipfile b/Pipfile index 8616edc..76d1785 100644 --- a/Pipfile +++ b/Pipfile @@ -14,6 +14,3 @@ pylint = "*" build = "*" twine = "*" tox-gh-actions = "*" - -[requires] -python_version = "3.8" diff --git a/README.md b/README.md index 561c8fe..d4d8dc7 100644 --- a/README.md +++ b/README.md @@ -5,13 +5,13 @@ ![PyPI](https://img.shields.io/pypi/v/zephyr-python-api) ![PyPI - License](https://img.shields.io/pypi/l/zephyr-python-api) ### Project description -This is a set of wrappers for Zephyr Scale (TM4J) REST API. This means you can interact with Zephyr Scale without GUI, access it with python code and create automation scripts for your every day interactions. - -NOTE: Currently only Scale Cloud wrappers are implemented. +This is a set of wrappers for Zephyr Scale (TM4J) REST API. This means you can interact with Zephyr Scale without GUI, access it with python code and create automation scripts for your every day interactions. To be done: -* Scale Server wrappers implementations -* Usage examples +* More usage examples +* Tests, tests and tests for gods of testing +* Convenient docs +* Implementing higher level wrappers representing Test Case, Test Cycle, etc. ### Installation @@ -30,7 +30,7 @@ zscale = ZephyrScale(token=) Zephyr Server (TM4J) auth: ```python -from zephyr import API_V1, ZephyrScale +from zephyr import ZephyrScale # Auth can be made with Jira token auth = {"token": ""} @@ -41,7 +41,7 @@ auth = {"username": "", "password": ""} # or even session cookie dict auth = {"cookies": ""} -zscale = ZephyrScale(api=API_V1, base_url=, **auth) +zscale = ZephyrScale.server_api(base_url=, **auth) ``` Then it is possible to interact with api wrappers: @@ -58,6 +58,10 @@ test_case = zapi.test_cases.get_test_case("") creation_result = zapi.test_cases.create_test_case("", "test_case_name") ``` +### Troubleshooting + +For troubleshooting see [TROUBLESHOOTING.md](TROUBLESHOOTING.md) + ### License diff --git a/TROUBLESHOOTING.md b/TROUBLESHOOTING.md new file mode 100644 index 0000000..66dc5e4 --- /dev/null +++ b/TROUBLESHOOTING.md @@ -0,0 +1,5 @@ +## Reporting to Zephyr + +- The Cucumber format is different from Behave reporter format. +In case you want to report test executions from output Behave file, +please use some custom formatter for Behave output, i.e. https://pypi.org/project/behave-cucumber-formatter/. diff --git a/VERSION.txt b/VERSION.txt new file mode 100644 index 0000000..6812f81 --- /dev/null +++ b/VERSION.txt @@ -0,0 +1 @@ +0.0.3 \ No newline at end of file diff --git a/examples/server.py b/examples/server.py new file mode 100644 index 0000000..9e95ad1 --- /dev/null +++ b/examples/server.py @@ -0,0 +1,72 @@ +""" +Usage examples of Zephyr Scale Server API wrappers. +""" +import logging + +from zephyr import ZephyrScale + + +# Enable logging with level Debug for more verbosity +logging.basicConfig(level=logging.DEBUG) + + +# Specify your Jira context to operate with: +base_url = "https://http://localhost:2990/jira/rest/atm/1.0/" + + +# Deside the type of authorization. It could be a token, username/password or cookies +auth = {"token": ""} +# auth = {"username": "", "password": "your_pass"} +# auth = {"cookies": ""} + +# Create an instance of Zephyr Scale +zscale = ZephyrScale.server_api(base_url=base_url, **auth) + + +# Now we can start playing with the Zephyr API! +test_cases = zscale.api.test_cases + +# Get a test case: +case_data = test_cases.get_test_case("") + +# Create a test case: +creation_data = test_cases.create_test_case("", "Test case name") + +# Update a test case: +test_script = { + "type": "STEP_BY_STEP", + "steps": [ + { + "description": "Description for the step 1", + "testData": "Some test data", + "expectedResult": "Expectations" + }, + { + "description": "Step 2 description", + "testData": "Some more test data", + "expectedResult": "Expected result for the step 2" + }]} +update_data = test_cases.update_test_case("", + objective=f"New_test_objective", + testScript=test_script) + +# Delete a test case: +deleted = test_cases.delete_test_case("") + +# Get test case attachments: +attachments = test_cases.get_attachments("") + +# Create a test case attachment (upload): +upload_result = test_cases.create_attachment("", "path_to_attachment_file") + +# Get the latest execution result for the test case: +execution_data = test_cases.get_latest_result("") + +# Get attachments for a specified step: +test_cases.get_step_attachments("", "") + +# Create an attachment for step: +test_cases.create_step_attachment("", "", "path_to_attachment_file") + +# Search test cases with JQL: +search = test_cases.search_cases(query='projectKey = ""') diff --git a/setup.cfg b/setup.cfg index f7396cd..842ad07 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = zephyr-python-api -version = 0.0.1.dev2 +version = file: VERSION.txt author = Petr Sharapenko author_email = nassauwinter@gmail.com description = Zephyr (TM4J) Python REST API wrapper @@ -10,14 +10,18 @@ url = https://github.com/nassauwinter/zephyr-python-api project_urls = Bug Tracker = https://github.com/nassauwinter/zephyr-python-api/issues classifiers = + Programming Language :: Python :: 3.6 Programming Language :: Python :: 3.7 Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + Programming Language :: Python :: 3.11 License :: OSI Approved :: Apache Software License Operating System :: OS Independent [options] packages = find: -python_requires = >=3.7 +python_requires = >=3.6 install_requires = requests diff --git a/tests/pytest.ini b/tests/pytest.ini new file mode 100644 index 0000000..c34d4cf --- /dev/null +++ b/tests/pytest.ini @@ -0,0 +1,9 @@ +[pytest] +markers = + unit: test Zephyr Scale modules + integration: test integration with Server/Cloud + +log_cli = True +log_cli_level = DEBUG +log_cli_format = %(asctime)s %(levelname)s %(message)s +log_cli_date_format = %Y-%m-%d %H:%M:%S \ No newline at end of file diff --git a/tests/test_scale.py b/tests/test_scale.py deleted file mode 100644 index 00b2958..0000000 --- a/tests/test_scale.py +++ /dev/null @@ -1,54 +0,0 @@ -from logging import Logger - -import pytest - -from zephyr.scale.scale import DEFAULT_BASE_URL, ZephyrScale - - -ZSESSION_PATH = "zephyr.scale.scale.ZephyrSession" -CLOUD_API_WRAP_PATH = "zephyr.scale.scale.CloudApiWrapper" - - -@pytest.mark.parametrize("creation_kwargs, exp_url", [({}, DEFAULT_BASE_URL), - ({"base_url": DEFAULT_BASE_URL}, DEFAULT_BASE_URL), - ({"base_url": "test.com"}, "test.com")]) -def test_scale_session_creation(creation_kwargs, exp_url, mocker): - session_mock = mocker.patch(ZSESSION_PATH) - mocker.patch(CLOUD_API_WRAP_PATH) - - ZephyrScale(**creation_kwargs) - - session_mock.assert_called_once_with(base_url=exp_url) - - -@pytest.mark.parametrize("creation_kwargs, api_version", [({}, CLOUD_API_WRAP_PATH), - ({"api_version": "v2"}, CLOUD_API_WRAP_PATH), - ({"api_version": "V2"}, CLOUD_API_WRAP_PATH)]) -def test_scale_defining_version(creation_kwargs, api_version, mocker): - zsession_mock = mocker.patch(ZSESSION_PATH) - wrapper_mock = mocker.patch(api_version) - - zephyr = ZephyrScale(**creation_kwargs) - - assert isinstance(zephyr, ZephyrScale), f"Resulted object should be instance of {ZephyrScale}" - wrapper_mock.assert_called_once_with(zsession_mock()) - - -@pytest.mark.parametrize("creation_kwargs, exception", - [({"api_version": "v"}, ValueError), - ({"api_version": "v1"}, NotImplementedError), - ({"api_version": "V1"}, NotImplementedError)]) -def test_scale_defining_version_exceptions(creation_kwargs, exception, mocker): - mocker.patch(ZSESSION_PATH) - - with pytest.raises(exception): - ZephyrScale(**creation_kwargs) - - -def test_scale_logger(mocker): - mocker.patch(ZSESSION_PATH) - mocker.patch(CLOUD_API_WRAP_PATH) - - zephyr = ZephyrScale() - - assert isinstance(zephyr.logger, Logger) diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_scale.py b/tests/unit/test_scale.py new file mode 100644 index 0000000..dcae06c --- /dev/null +++ b/tests/unit/test_scale.py @@ -0,0 +1,63 @@ +from logging import Logger + +import pytest + +from zephyr.scale.scale import DEFAULT_BASE_URL, ZephyrScale + + +ZSESSION_PATH = "zephyr.scale.scale.ZephyrSession" +CLOUD_API_WRAP_PATH = "zephyr.scale.scale.CloudApiWrapper" +SERVER_API_WRAP_PATH = "zephyr.scale.scale.ServerApiWrapper" + + +@pytest.mark.unit +class TestZephyrScale: + @pytest.mark.parametrize("creation_kwargs, exp_url", [({}, DEFAULT_BASE_URL), + ({"base_url": DEFAULT_BASE_URL}, DEFAULT_BASE_URL), + ({"base_url": "test.com"}, "test.com")]) + def test_scale_session_creation(self, creation_kwargs, exp_url, mocker): + session_mock = mocker.patch(ZSESSION_PATH) + mocker.patch(CLOUD_API_WRAP_PATH) + + ZephyrScale(**creation_kwargs) + + session_mock.assert_called_once_with(base_url=exp_url) + + @pytest.mark.parametrize("creation_kwargs, api_version", [({}, CLOUD_API_WRAP_PATH), + ({"api_version": "v2"}, CLOUD_API_WRAP_PATH), + ({"api_version": "V2"}, CLOUD_API_WRAP_PATH), + ({"api_version": "v1"}, SERVER_API_WRAP_PATH), + ({"api_version": "V1"}, SERVER_API_WRAP_PATH)]) + def test_scale_defining_version(self, creation_kwargs, api_version, mocker): + zsession_mock = mocker.patch(ZSESSION_PATH) + wrapper_mock = mocker.patch(api_version) + + zephyr = ZephyrScale(**creation_kwargs) + + assert isinstance(zephyr, ZephyrScale), f"Resulted object should be instance of {ZephyrScale}" + wrapper_mock.assert_called_once_with(zsession_mock()) + + @pytest.mark.parametrize("creation_kwargs, api_version", [({"base_url": "test.com"}, SERVER_API_WRAP_PATH)]) + def test_server_cls_method(self, creation_kwargs, api_version, mocker): + zsession_mock = mocker.patch(ZSESSION_PATH) + wrapper_mock = mocker.patch(api_version) + + zephyr = ZephyrScale.server_api(**creation_kwargs) + assert isinstance(zephyr, ZephyrScale), f"Resulted object should be instance of {ZephyrScale}" + wrapper_mock.assert_called_once_with(zsession_mock()) + + @pytest.mark.parametrize("creation_kwargs, exception", + [({"api_version": "v"}, ValueError)]) + def test_scale_defining_version_exceptions(self, creation_kwargs, exception, mocker): + mocker.patch(ZSESSION_PATH) + + with pytest.raises(exception): + ZephyrScale(**creation_kwargs) + + def test_scale_logger(self, mocker): + mocker.patch(ZSESSION_PATH) + mocker.patch(CLOUD_API_WRAP_PATH) + + zephyr = ZephyrScale() + + assert isinstance(zephyr.logger, Logger) diff --git a/tests/unit/test_zephyr_session.py b/tests/unit/test_zephyr_session.py new file mode 100644 index 0000000..0d2433c --- /dev/null +++ b/tests/unit/test_zephyr_session.py @@ -0,0 +1,95 @@ +import pytest +from requests import Session + +from zephyr.scale.scale import DEFAULT_BASE_URL, ZephyrSession +from zephyr.scale.zephyr_session import INIT_SESSION_MSG, InvalidAuthData + +REQUESTS_SESSION_PATH = "requests.sessions.Session" +GETLOGGER_PATH = "logging.getLogger" +LOGGER_DEBUG_PATH = "logging.Logger.debug" + + +@pytest.mark.unit +class TestZephyrSession: + def test_creation(self, mocker): + """Tests basic creation logic""" + logger_mock = mocker.patch(GETLOGGER_PATH) + + zsession = ZephyrSession(DEFAULT_BASE_URL, token="token_test") + + assert zsession.base_url == DEFAULT_BASE_URL, (f"Attribute base_url expected to be {DEFAULT_BASE_URL}, " + f"not {zsession.base_url}") + assert isinstance(zsession._session, Session) + logger_mock.assert_called_with("zephyr.scale.zephyr_session") + + def test_token_auth(self, mocker): + """Test token auth""" + token = "test_token" + logger_mock = mocker.patch(LOGGER_DEBUG_PATH) + + zsession = ZephyrSession(DEFAULT_BASE_URL, token=token) + + logger_mock.assert_called_with(INIT_SESSION_MSG.format("token")) + assert f"Bearer {token}" == zsession._session.headers.get("Authorization") + + def test_credentials_auth(self, mocker): + """Test auth with username and password""" + username = "usertest" + password = "pwdtest" + logger_mock = mocker.patch(LOGGER_DEBUG_PATH) + + zsession = ZephyrSession(DEFAULT_BASE_URL, username=username, password=password) + + logger_mock.assert_called_with(INIT_SESSION_MSG.format("username and password")) + assert (username, password) == zsession._session.auth + + def test_cookie_auth(self, mocker): + """Test auth with cookie dict""" + test_cookie = {"cookies": {"cookie.token": "cookie_test"}} + logger_mock = mocker.patch(LOGGER_DEBUG_PATH) + + zsession = ZephyrSession(DEFAULT_BASE_URL, cookies=test_cookie) + + logger_mock.assert_called_with(INIT_SESSION_MSG.format("cookies")) + assert test_cookie['cookies'] in zsession._session.cookies.values() + + @pytest.mark.parametrize("auth_data, exception", [(dict(), InvalidAuthData), + ({"username": "user"}, InvalidAuthData), + ({"password": "pwd"}, InvalidAuthData)]) + def test_auth_exception(self, auth_data, exception): + """Test exceptions on auth""" + with pytest.raises(exception): + ZephyrSession(DEFAULT_BASE_URL, **auth_data) + + @pytest.mark.parametrize("creation_kwargs", + [{"token": "token_test", + "session_attrs": {'verify': False, "max_redirects": 333}}]) + def test_requests_session_attrs(self, creation_kwargs, mocker): + """The test checks ZephyrScale (not) provided with "session_attrs".""" + logger_mock = mocker.patch(LOGGER_DEBUG_PATH) + session_attrs = creation_kwargs.get('session_attrs') + + zsession = ZephyrSession(DEFAULT_BASE_URL, **creation_kwargs) + + logger_mock.assert_called_with( + f"Modify requests session object with {session_attrs}") + + for attrib, value in session_attrs.items(): + actual = getattr(zsession._session, attrib) + assert actual == session_attrs[attrib], (f"Request session attr {attrib} is {actual}, " + f"but expected{session_attrs[attrib]}") + + @pytest.mark.skip + def test_crud_request(self): + """Test GET, POST, PUT, DELETE requests""" + pass + + @pytest.mark.skip + def test_get_paginated(self): + """Test paginated request""" + pass + + @pytest.mark.skip + def test_post_file(self): + """Test Post file wrapper""" + pass diff --git a/tox.ini b/tox.ini index 0badb06..953f7f2 100644 --- a/tox.ini +++ b/tox.ini @@ -1,16 +1,22 @@ [tox] isolated_build = True -envlist = py37, py38, py39 +envlist = py36, py37, py38, py39, py310, py311 [gh-actions] python = + 3.6: py36 3.7: py37 3.8: py38 3.9: py39 + 3.10: py310 + 3.11: py311 [testenv] deps = pipenv commands= pipenv install --dev --skip-lock - pylint --rc-file .pylintrc zephyr - pipenv run pytest tests -v + python --version + pip --version + pylint --version + pylint --rcfile .pylintrc zephyr + pipenv run pytest tests -m unit -v diff --git a/zephyr/__init__.py b/zephyr/__init__.py index 7aa565d..3232bda 100644 --- a/zephyr/__init__.py +++ b/zephyr/__init__.py @@ -1 +1,2 @@ from zephyr.scale import API_V1, API_V2, ZephyrScale +from zephyr.utils.common import cookie_str_to_dict diff --git a/zephyr/scale/cloud/endpoints/automations.py b/zephyr/scale/cloud/endpoints/automations.py index b325ad7..093a985 100644 --- a/zephyr/scale/cloud/endpoints/automations.py +++ b/zephyr/scale/cloud/endpoints/automations.py @@ -1,3 +1,5 @@ +from json import dumps + from ...zephyr_session import ZephyrSession @@ -7,25 +9,99 @@ class AutomationEndpoints: def __init__(self, session: ZephyrSession): self.session = session - def post_custom_format(self): + def _post_reports(self, + path, + project_key, + file_path, + auto_create=False, + test_cycle=None, + **kwargs): + """ + Post various reports logic. + + :param path: str with resource path + :param project_key: str with project key + :param file_path: str with path to .zip archive with report files + :param auto_create: indicate if test cases should be created if non existent + :param test_cycle: dict with test cycle description data + """ + params = {'projectKey': project_key} + to_files = None + + if auto_create: + params.update({'autoCreateTestCases': True}) + + if test_cycle: + to_files = {'testCycle': (None, dumps(test_cycle), 'application/json')} + + return self.session.post_file(path, + file_path, + to_files=to_files, + params=params, + **kwargs) + + def post_custom_format(self, + project_key, + file_path, + auto_create=False, + test_cycle=None, + **kwargs): """ Create results using Zephyr Scale's custom results format. + + :param project_key: str with project key + :param file_path: str with path to .zip archive with report files + :param auto_create: indicate if test cases should be created if non existent + :param test_cycle: dict with test cycle description data """ - raise NotImplementedError + return self._post_reports('automations/executions/custom', + project_key=project_key, + file_path=file_path, + auto_create=auto_create, + test_cycle=test_cycle, + **kwargs) - def post_cucumber_format(self): + def post_cucumber_format(self, + project_key, + file_path, + auto_create=False, + test_cycle=None, + **kwargs): """ Create results using the Cucumber results format. + + :param project_key: str with project key + :param file_path: str with path to .zip archive with report files + :param auto_create: indicate if test cases should be created if non existent + :param test_cycle: dict with test cycle description data """ - raise NotImplementedError + return self._post_reports('automations/executions/cucumber', + project_key=project_key, + file_path=file_path, + auto_create=auto_create, + test_cycle=test_cycle, + **kwargs) - def post_junit_xml_format(self): + def post_junit_xml_format(self, + project_key, + file_path, + auto_create=False, + test_cycle=None, + **kwargs): """ - Create results using the JUnit XML results format. Optionally, - you can send a 'testCycle' part in your form data to customize - the created test cycle. + Create results using the JUnit XML results format. + + :param project_key: str with project key + :param file_path: str with path to .zip archive with report files + :param auto_create: indicate if test cases should be created if non existent + :param test_cycle: dict with test cycle description data """ - raise NotImplementedError + return self._post_reports('automations/executions/junit', + project_key=project_key, + file_path=file_path, + auto_create=auto_create, + test_cycle=test_cycle, + **kwargs) def get_testcases_zip(self, project_key): """ diff --git a/zephyr/scale/scale.py b/zephyr/scale/scale.py index ad258be..26a29ab 100644 --- a/zephyr/scale/scale.py +++ b/zephyr/scale/scale.py @@ -2,6 +2,7 @@ from zephyr.scale.zephyr_session import ZephyrSession from zephyr.scale.cloud.cloud_api import CloudApiWrapper +from zephyr.scale.server.server_api import ServerApiWrapper DEFAULT_BASE_URL = "https://api.zephyrscale.smartbear.com/v2/" @@ -26,7 +27,12 @@ def __init__(self, base_url=None, api_version=API_V2, **kwargs): if api_version.lower() == API_V2: self.api = CloudApiWrapper(session) elif api_version.lower() == API_V1: - raise NotImplementedError("Server api wrappers have not been implemented yet.") + self.api = ServerApiWrapper(session) else: raise ValueError("API version should be either 'v1' (Server) or 'v2' (Cloud)") self.logger = logging.getLogger(__name__) + + @classmethod + def server_api(cls, base_url, **kwargs): + """Alternative constructor for Zephyr Scale Server client""" + return cls(base_url=base_url, api_version=API_V1, **kwargs) diff --git a/zephyr/scale/server/__init__.py b/zephyr/scale/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zephyr/scale/server/endpoints/__init__.py b/zephyr/scale/server/endpoints/__init__.py new file mode 100644 index 0000000..f310e0a --- /dev/null +++ b/zephyr/scale/server/endpoints/__init__.py @@ -0,0 +1,12 @@ +from .endpoints import (AttachmentEndpoints, + AutomationEndpoints, + CustomFieldEndpoints, + DeleteExecutionEndpoints, + EnvironmentEndpoints, + FolderEndpoints, + IssueLinkEndpoints, + ProjectEndpoints, + TestCaseEndpoints, + TestPlanEndpoints, + TestResultEndpoints, + TestRunEndpoints) diff --git a/zephyr/scale/server/endpoints/endpoints.py b/zephyr/scale/server/endpoints/endpoints.py new file mode 100644 index 0000000..8b3f9cc --- /dev/null +++ b/zephyr/scale/server/endpoints/endpoints.py @@ -0,0 +1,370 @@ +from ...zephyr_session import ZephyrSession +from .paths import ServerPaths as Paths + + +class EndpointTemplate: + """Class with basic constructor for endpoint classes""" + def __init__(self, session: ZephyrSession): + self.session = session + + +class TestCaseEndpoints(EndpointTemplate): + """Api wrapper for "Test Case" endpoints""" + + def create_test_case(self, project_key, name, **kwargs): + """Creates a new Test Case""" + json = {"projectKey": project_key, + "name": name} + json.update(kwargs) + return self.session.post(Paths.CASE, json=json) + + def get_test_case(self, test_case_key, **params): + """Retrieve the Test Case matching the given key""" + return self.session.get(Paths.CASE_KEY.format(test_case_key), + params=params) + + def update_test_case(self, test_case_key, **json): + """Updates a Test Case""" + return self.session.put(Paths.CASE_KEY.format(test_case_key), + json=json) + + def delete_test_case(self, test_case_key): + """Delete the Test Case matching the given key""" + return self.session.delete(Paths.CASE_KEY.format(test_case_key)) + + def get_attachments(self, test_case_key): + """Retrieve the Test Case Attachments matching the given key""" + return self.session.get(Paths.CASE_ATTACH.format(test_case_key)) + + def create_attachment(self, test_case_key, file_path): + """Create a new attachment on the specified Test Case""" + return self.session.post_file(Paths.CASE_ATTACH.format(test_case_key), + file_path) + + def get_latest_result(self, test_case_key): + """Retrieve the last test result for a given key""" + return self.session.get(Paths.CASE_LATEST_RES.format(test_case_key)) + + def get_step_attachments(self, test_case_key, step_index): + """Retrieve the attachments for a test case step""" + return self.session.get(Paths.CASE_STP_ATTACH.format(test_case_key, step_index)) + + def create_step_attachment(self, test_case_key, step_index, file_path): + """Create a new attachment on the specified Step of a Test Case""" + return self.session.post(Paths.CASE_STP_ATTACH.format(test_case_key, step_index), + file_path) + + def search_cases(self, query, **params): + """Retrieve the Test Cases that matches the query passed as parameter""" + params.update({"query": query}) + return self.session.get(Paths.CASE_SEARCH, params=params) + + def get_all_versions(self, test_case_key, **params): + """Get all test case versions ids by its key name. Undocumented in API""" + return self.session.get(Paths.CASE_VERS.format(test_case_key), params=params) + + +class TestPlanEndpoints(EndpointTemplate): + """Api wrapper for "Test Plan" endpoints""" + + def create_test_plan(self, project_key, name, **kwargs): + """Creates a new Test Plan""" + json = {"projectKey": project_key, + "name": name} + json.update(kwargs) + return self.session.post(Paths.PLAN, json=json) + + def get_test_plan(self, test_plan_key, **params): + """Retrieve the Test Plan matching the given key""" + return self.session.get(Paths.PLAN_KEY.format(test_plan_key), + params=params) + + def update_test_plan(self, test_plan_key, **json): + """Updates a Test Plan""" + return self.session.put(Paths.PLAN_KEY.format(test_plan_key), + json=json) + + def delete_test_plan(self, test_plan_key): + """Delete the Test Plan matching the given key""" + return self.session.delete(Paths.PLAN_KEY.format(test_plan_key)) + + def get_attachments(self, test_plan_key): + """Retrieve the Test Plan Attachments matching the given key""" + return self.session.get(Paths.PLAN_ATTACH.format(test_plan_key)) + + def create_attachments(self, test_plan_key, file_path): + """Create a new attachment on the specified Test Plan""" + return self.session.post_file(Paths.PLAN_ATTACH.format(test_plan_key), + file_path) + + def search_plans(self, query, **params): + """Retrieve the Test Plans that matches the query passed as parameter""" + params.update({"query": query}) + return self.session.get(Paths.PLAN_SEARCH, params=params) + + +class TestRunEndpoints(EndpointTemplate): + """Api wrapper for "Test Run" endpoints""" + + def create_test_run(self, project_key, name, **kwargs): + """Creates a new Test Run""" + json = {"projectKey": project_key, + "name": name} + json.update(kwargs) + return self.session.post(Paths.RUN, json=json) + + def get_test_run(self, test_run_key, **params): + """Retrieve the Test Run matching the given key""" + return self.session.get(Paths.RUN_KEY.format(test_run_key), + params=params) + + def delete_test_run(self, test_run_key): + """Delete the Test Run matching the given key""" + return self.session.delete(Paths.RUN_KEY.format(test_run_key)) + + def get_attachments(self, test_run_key): + """Retrieve the Test Run Attachments matching the given key""" + return self.session.get(Paths.RUN_ATTACH.format(test_run_key)) + + def create_attachments(self, test_run_key, file_path): + """Create a new attachment on the specified Test Run""" + return self.session.post_file(Paths.RUN_ATTACH.format(test_run_key), + file_path) + + def create_test_result(self, test_run_key, test_case_key, **json): + """ + Creates a new Test Result on the specified Test Run, looking for an item that matches + the testCaseKey and the query string filter parameters. + """ + return self.session.post(Paths.RUN_TEST_RESULT.format(test_run_key, test_case_key), + json=json) + + def update_test_result(self, test_run_key, test_case_key, **json): + """ + Updates the last Test Result on the specified Test Run, looking for an item that matches + the testCaseKey and the query string filter parameters. Only defined fields will be updated. + """ + return self.session.post(Paths.RUN_TEST_RESULT.format(test_run_key, test_case_key), + json=json) + + def get_test_results(self, test_run_key): + """Retrieve All Test Results linked to a Test Run""" + return self.session.get(Paths.RUN_TEST_RESULTS.format(test_run_key)) + + def create_test_results(self, test_run_key, results): + """ + Create new Test Results on the specified Test Run, looking for items that match + the testCaseKey for each body item. + """ + return self.session.post(Paths.RUN_TEST_RESULTS.format(test_run_key), + json=results) + + def search_runs(self, query, **params): + """Retrieve the Test Runs that matches the query passed as parameter""" + params.update({"query": query}) + return self.session.get(Paths.RUN_SEARCH, params=params) + + +class TestResultEndpoints(EndpointTemplate): + """Api wrapper for "Test Result" endpoints""" + + def create_test_result(self, project_key, test_case_key, **json): + """Creates a new Test Result for a Test Case""" + data = {"projectKey": project_key, + "testCaseKey": test_case_key} + data.update(json) + return self.session.post(Paths.RES, json=data) + + def get_attachments(self, test_result_id): + """Retrieve the Test Result Attachments matching the given id""" + return self.session.get(Paths.RES_ATTACH.format(test_result_id)) + + def create_attachment(self, test_result_id, file_path): + """Create a new attachment on the specified Test Result""" + return self.session.post_file(Paths.RES_ATTACH.format(test_result_id), file_path) + + def get_step_attachments(self, test_result_id, step_id): + """ + Retrieve the Test Result Step Attachments matching the given testResultId and + stepIndex + """ + return self.session.get(Paths.RES_STP_ATTACH.format(test_result_id, step_id)) + + def create_step_attachment(self, test_result_id, step_id, file_path): + """Create a new attachment on the specified step of the Test Result""" + return self.session.post_file(Paths.RES_STP_ATTACH.format(test_result_id, step_id), + file_path) + + +class IssueLinkEndpoints(EndpointTemplate): + """Api wrapper for "Issue Link" endpoints""" + + def get_issue_links(self, issue_key, **params): + """Retrieve all Test Cases linked to an Issue""" + return self.session.get(Paths.ISSUE_CASES.format(issue_key), params=params) + + +class FolderEndpoints(EndpointTemplate): + """Api wrapper for "Folder" endpoints""" + + def create_folder(self, project_key, name, folder_type): + """ + Creates a new folder for test cases, test plans or test runs. + + In order to create a new folder you must POST a json with 3 fields: projectKey, + name and type. The field type can be filled with TEST_CASE, TEST_PLAN or TEST_RUN. + """ + json = {"projectKey": project_key, + "name": name, + "type": folder_type} + return self.session.post(Paths.FOLDER, json=json) + + def update_folder(self, folder_id, **json): + """ + Updates a folder for test cases, test plans or test runs. + + You can only update the name or the custom field value of a folder, in order to do that + you must PUT a json with 2 fields: name and customFields. The field name is a String and + forward and backslashes are not allowed. The field customFields is an object with + the key being the custom field name. + """ + return self.session.put(Paths.FOLDER_ID.format(folder_id), json=json) + + +class AttachmentEndpoints(EndpointTemplate): + """Api wrapper for "Attachment" endpoints""" + + def delete_attachment(self, attachment_id): + """Delete an Attachment given an id""" + return self.session.delete(Paths.ATTACH.format(attachment_id)) + + +class EnvironmentEndpoints(EndpointTemplate): + """Api wrapper for "Environment" endpoints""" + + def get_environments(self, project_key): + """ + Retrieve the Environments matching the given projectKey. + + The project must exist. + The project must have Zephyr Scale enabled. + """ + params = {"projectKey": project_key} + return self.session.get(Paths.ENV, params=params) + + def create_environment(self, project_key, name, description=None): + """ + Creates a new Environment. + + The project must exist + The project must have Zephyr Scale enabled + The name must be unique + """ + json = {"projectKey": project_key, + "name": name, + "description": description} + return self.session.post(Paths.ENV, json=json) + + +class AutomationEndpoints(EndpointTemplate): + """Api wrapper for "Automation" endpoints""" + + def create_cycle(self, project_key, file_path, cycle_data=None): + """ + Creates a new Test Cycle based on provided automated test results. + + This endpoint receives a zip file containing one or more Zephyr Scale Test Results File + Format to create the Test Cycle. See Zephyr Scale JUnit Integration + (https://bitbucket.org/smartbeartm4j/tm4j-junit-integration) to learn how + to generate this file. Optionally, you can send a testCycle part in your form data + to customize the created Test Cycle. + """ + return self.session.post_file(Paths.ATM_PRJ_KEY.format(project_key), + file_path=file_path, + data=cycle_data) + + def create_cycle_cucumber(self, project_key, file_path, cycle_data=None): + """ + Creates a new Test Cycle based on provided automated test results. + + This endpoint receives a zip file containing one or more Cucumber Json Output file + (https://relishapp.com/cucumber/cucumber/docs/formatters/json-output-formatter). + Optionally, you can send a testCycle part in your form data to customize + the created Test Cycle. + """ + # return self.session.post_file(Paths.ATM_CUCUMBER.format(project_key), + # file_path=file_path, + # data=cycle_data) + raise NotImplementedError + + def get_testcases_cucumber(self, query): + """ + Retrieve a zip file containing Cucumber Feature Files + that matches the tql passed as parameter. + """ + # return self.session.get(Paths.ATM_CASES, params={"tql": query}) + raise NotImplementedError + + +class ProjectEndpoints(EndpointTemplate): + """Api wrapper for "Project" endpoints""" + + def create_zephyr_project(self, project_key, enabled): + """ + Create a Zephyr Scale project for an existing Jira project. + + If the Zephyr Scale project exists, enable/disable it. + """ + json = {"projectKey": project_key, + "enabled": enabled} + return self.session.post(Paths.PRJ, json=json) + + +class CustomFieldEndpoints(EndpointTemplate): + """Api wrapper for "Custom Field" endpoints""" + + def create_custom_field(self, project_key, name, field_type, category, **kwargs): + """ + Creates a new custom field for test cases, test plans, test runs, test result or folder. + The custom fied name must be unique by project and category. + + Custom fields must have one of these categories: + TEST_PLAN, TEST_RUN, TEST_STEP, TEST_EXECUTION, TEST_CASE or FOLDER. + + Custom fields must have of these types: SINGLE_LINE_TEXT, MULTI_LINE_TEXT, NUMBER, DATE, + SINGLE_CHOICE_SELECT_LIST, CHECKBOX, DECIMAL, MULTI_CHOICE_SELECT_LIST or USER_LIST. + """ + json = {"projectKey": project_key, + "name": name, + "type": field_type, + "category": category} + json.update(kwargs) + return self.session.post(Paths.CFIELD, json=json) + + def create_custom_field_opt(self, custom_field_id, option_name): + """ + Creates a new custom field option for SINGLE_CHOICE_SELECT_LIST or + MULTI_CHOICE_SELECT_LIST custom field. + """ + return self.session.post(Paths.CFIELD_OPT.format(custom_field_id), + json={"name": option_name}) + + +class DeleteExecutionEndpoints(EndpointTemplate): + """Api wrapper for "Delete Execution" endpoints""" + + def delete_execution(self, date): + """ + Starts the deletion process of Test Executions (also known as Test Results). + + This process only removes executions older than 3 months and it will keep + the last test executions. Only Jira Admin users can execute this process. + """ + json = {"deleteExecutionsCreatedBefore": date} + return self.session.post(Paths.DEL_EXEC, json=json) + + def get_status(self): + """Gets the status of the test execution deletion process. + + The statuses can be: IN_PROGRESS, FINISHED or FAILED.""" + return self.session.get(Paths.DEL_EXEC_STATUS) diff --git a/zephyr/scale/server/endpoints/paths.py b/zephyr/scale/server/endpoints/paths.py new file mode 100644 index 0000000..4052aff --- /dev/null +++ b/zephyr/scale/server/endpoints/paths.py @@ -0,0 +1,64 @@ +"""Paths to form Server API URLs""" + + +class ServerPaths: + """ + Zephyr Scale Server API (v1) paths based on: + https://support.smartbear.com/zephyr-scale-server/api-docs/v1/ + """ + # Test Case + CASE = "testcase" + CASE_KEY = "testcase/{}" + CASE_ATTACH = "testcase/{}/attachments" + CASE_LATEST_RES = "testcase/{}/testresult/latest" + CASE_STP_ATTACH = "testcase/{}/step/{}/attachments" + CASE_SEARCH = "testcase/search" + CASE_VERS = "testcase/{}/allVersions" + + # Test Plan + PLAN = "testplan" + PLAN_KEY = "testplan/{}" + PLAN_ATTACH = "testplan/{}/attachments" + PLAN_SEARCH = "testplan/search" + + # Test Run + RUN = "testrun" + RUN_KEY = "testrun/{}" + RUN_ATTACH = "testrun/{}/attachments" + RUN_TEST_RESULT = "testrun/{}/testcase/{}/testresult" + RUN_TEST_RESULTS = "testrun/{}/testresults" + RUN_SEARCH = "testrun/search" + + # Test Result + RES = "testresult" + RES_ATTACH = "testresult/{}/attachments" + RES_STP_ATTACH = "testresult/{}/step/{}/attachments" + + # Issue link + ISSUE_CASES = "issuelink/{}/testcases" + + # Folder + FOLDER = "folder" + FOLDER_ID = "folder/{}" + + # Attachments + ATTACH = "attachments/{}" + + # Environments + ENV = "environments" + + # Automation + ATM_PRJ_KEY = "automation/execution/{}" + ATM_CUCUMBER = "automation/execution/cucumber/{}" + ATM_CASES = "automation/testcases" + + # Project + PRJ = "project" + + # Custom Field + CFIELD = "customfield" + CFIELD_OPT = "customfield/{}/option" + + # Delete Execution + DEL_EXEC = "delete/executiondeletion" + DEL_EXEC_STATUS = "delete/executiondeletion/status" diff --git a/zephyr/scale/server/server_api.py b/zephyr/scale/server/server_api.py new file mode 100644 index 0000000..c52b5c6 --- /dev/null +++ b/zephyr/scale/server/server_api.py @@ -0,0 +1,60 @@ +import logging + +from zephyr.scale.zephyr_session import ZephyrSession +from zephyr.scale.server import endpoints + + +# pylint: disable=missing-function-docstring +class ServerApiWrapper: + """Zephyr Scale Server Api wrapper""" + def __init__(self, session: ZephyrSession): + self.session = session + self.logger = logging.getLogger(__name__) + + @property + def attachments(self): + return endpoints.AttachmentEndpoints(self.session) + + @property + def automation(self): + return endpoints.AutomationEndpoints(self.session) + + @property + def custom_field(self): + return endpoints.CustomFieldEndpoints(self.session) + + @property + def delete_execution(self): + return endpoints.DeleteExecutionEndpoints(self.session) + + @property + def environment(self): + return endpoints.EnvironmentEndpoints(self.session) + + @property + def folder(self): + return endpoints.FolderEndpoints(self.session) + + @property + def issue_link(self): + return endpoints.IssueLinkEndpoints(self.session) + + @property + def project(self): + return endpoints.ProjectEndpoints(self.session) + + @property + def test_cases(self): + return endpoints.TestCaseEndpoints(self.session) + + @property + def test_plans(self): + return endpoints.TestPlanEndpoints(self.session) + + @property + def test_results(self): + return endpoints.TestResultEndpoints(self.session) + + @property + def test_runs(self): + return endpoints.TestRunEndpoints(self.session) diff --git a/zephyr/scale/server/server_defaults.py b/zephyr/scale/server/server_defaults.py new file mode 100644 index 0000000..129742f --- /dev/null +++ b/zephyr/scale/server/server_defaults.py @@ -0,0 +1,19 @@ +""" +This module contains default values for various Server API entities +""" + + +class TestCaseDefaults: + """Class container to store Test Case default values""" + CASE_FIELDS = ("id,projectId,archived,key,name,objective,majorVersion,latestVersion," + "precondition,folder(id,fullName),status,priority,estimatedTime," + "averageTime,componentId,owner,labels,customFieldValues,testScript(id,text," + "steps(index,description,text,expectedResult,testData,attachments," + "customFieldValues,id,stepParameters(id,testCaseParameterId,value),testCase(" + "id,key,name,archived,majorVersion,latestVersion,parameters(id,name," + "defaultValue,index)))),testData,parameters(id,name,defaultValue,index)," + "paramType") + TEST_RESULT_FIELDS = ("testResultStatus(name,i18nKey,color),environment(name),key,userKey," + "assignedTo,jiraVersionId,estimatedTime,executionTime,executionDate," + "automated,testRun,testCase,issueLinks,sprint(name)") + VERSION_FIELDS = "id,majorVersion" diff --git a/zephyr/scale/zephyr_session.py b/zephyr/scale/zephyr_session.py index 22e7f7c..ab233f1 100644 --- a/zephyr/scale/zephyr_session.py +++ b/zephyr/scale/zephyr_session.py @@ -1,12 +1,16 @@ import logging from urllib.parse import urlparse, parse_qs -from requests import Session +from requests import HTTPError, Session INIT_SESSION_MSG = "Initialize session by {}" +class InvalidAuthData(Exception): + """Invalid authentication data provided""" + + class ZephyrSession: """ Zephyr Scale basic session object. @@ -16,11 +20,15 @@ class ZephyrSession: :param username: username :param password: password :param cookies: cookie dict + + :keyword session_attrs: a dict with session attrs to be set as keys and their values """ - def __init__(self, base_url, token=None, username=None, password=None, cookies=None): + def __init__(self, base_url, token=None, username=None, password=None, cookies=None, **kwargs): self.base_url = base_url self._session = Session() + self.logger = logging.getLogger(__name__) + if token: self.logger.debug(INIT_SESSION_MSG.format("token")) self._session.headers.update({"Authorization": f"Bearer {token}"}) @@ -31,12 +39,21 @@ def __init__(self, base_url, token=None, username=None, password=None, cookies=N self.logger.debug(INIT_SESSION_MSG.format("cookies")) self._session.cookies.update(cookies) else: - raise Exception("Insufficient auth data") + raise InvalidAuthData("Insufficient auth data") + + if kwargs.get("session_attrs"): + self._modify_session(**kwargs.get("session_attrs")) def _create_url(self, *args): """Helper for URL creation""" return self.base_url + "/".join(args) + def _modify_session(self, **kwargs): + """Modify requests session with extra arguments""" + self.logger.debug(f"Modify requests session object with {kwargs}") + for session_attr, value in kwargs.items(): + setattr(self._session, session_attr, value) + def _request(self, method: str, endpoint: str, return_raw: bool = False, **kwargs): """General request wrapper with logging and handling response""" self.logger.debug(f"{method.capitalize()} data: endpoint={endpoint} and {kwargs}") @@ -48,7 +65,7 @@ def _request(self, method: str, endpoint: str, return_raw: bool = False, **kwarg if response.text: return response.json() return "" - raise Exception(f"Error {response.status_code}. Response: {response.content}") + raise HTTPError(f"Error {response.status_code}. Response: {response.content}") def get(self, endpoint: str, params: dict = None, **kwargs): """Get request wrapper""" @@ -83,3 +100,16 @@ def get_paginated(self, endpoint, params=None): params_str = urlparse(response.get("next")).query params.update(parse_qs(params_str)) return + + def post_file(self, endpoint: str, file_path: str, to_files=None, **kwargs): + """ + Post wrapper to send a file. Handles single file opening, + sending its content and closing + """ + with open(file_path, "rb") as file: + files = {"file": file} + + if to_files: + files.update(to_files) + + return self._request("post", endpoint, files=files, **kwargs) diff --git a/zephyr/utils/__init__.py b/zephyr/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zephyr/utils/common.py b/zephyr/utils/common.py new file mode 100644 index 0000000..0ed7c0f --- /dev/null +++ b/zephyr/utils/common.py @@ -0,0 +1,15 @@ +"""Common helper functions to use with the package""" + + +def cookie_str_to_dict(cookie_str: str) -> dict: + """ + Function to convert a cookie string from a browser Jira session to cookie dict for auth + + :param cookie_str: a string copied from Jira session in browser + :returns: dict with parsed cookies + """ + cookie_dict = {} + for cookie_substr in cookie_str.split(";"): + _key, _value = cookie_substr.strip().split("=", maxsplit=1) + cookie_dict.update({_key: _value}) + return cookie_dict