From d63da04060e9337d38b290918c93dd850fab05a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Tue, 1 Aug 2023 16:13:00 +0200 Subject: [PATCH 01/13] low level method getting schedules --- .gitignore | 1 + .../api/firewall_proxy.md | 92 +++++++++++++++++ .../run_low_level_methods.py | 4 + panos_upgrade_assurance/firewall_proxy.py | 98 +++++++++++++++++++ 4 files changed, 195 insertions(+) diff --git a/.gitignore b/.gitignore index 5490f6d..593e8f8 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ sidebar.json .python-version requirements.txt .coverage +openssl.cnf \ No newline at end of file diff --git a/docs/panos-upgrade-assurance/api/firewall_proxy.md b/docs/panos-upgrade-assurance/api/firewall_proxy.md index 88ca108..0bd42c7 100644 --- a/docs/panos-upgrade-assurance/api/firewall_proxy.md +++ b/docs/panos-upgrade-assurance/api/firewall_proxy.md @@ -956,3 +956,95 @@ __Returns__ } ``` +### `FirewallProxy.get_jobs` + +```python +def get_jobs() -> dict +``` + +Get details on all jobs. + +This method retrieves all jobs and their details, this means running, pending, finished, etc. + +The actual API command is `show jobs all`. + +__Returns__ + + +`dict`: All jobs found on the device, indexed by the ID of a job. + +```python showLineNumbers title="Sample output" +{'1': {'description': None, + 'details': {'line': ['ID population failed', + 'Client logrcvr registered in the middle of a ' + 'commit/validate. Aborting current ' + 'commit/validate.', + 'Commit failed', + 'Failed to commit policy to device']}, + 'positionInQ': '0', + 'progress': '100', + 'queued': 'NO', + 'result': 'FAIL', + 'status': 'FIN', + 'stoppable': 'no', + 'tdeq': '00:28:32', + 'tenq': '2023/08/01 00:28:32', + 'tfin': '2023/08/01 00:28:36', + 'type': 'AutoCom', + 'user': None, + 'warnings': None}, +'2': {'description': None, + 'details': {'line': ['Configuration committed successfully', + 'Successfully committed last configuration']}, + 'positionInQ': '0', + 'progress': '100', + 'queued': 'NO', + 'result': 'OK', + 'status': 'FIN', + 'stoppable': 'no', + 'tdeq': '00:28:40', + 'tenq': '2023/08/01 00:28:40', + 'tfin': '2023/08/01 00:29:20', + 'type': 'AutoCom', + 'user': None, + 'warnings': None}, +'3': {'description': None, + 'details': None, + 'positionInQ': '0', + 'progress': '30', + 'queued': 'NO', + 'result': 'PEND', + 'status': 'ACT', + 'stoppable': 'yes', + 'tdeq': '00:58:59', + 'tenq': '2023/08/01 00:58:59', + 'tfin': None, + 'type': 'Downld', + 'user': None, + 'warnings': None}} +``` + +### `FirewallProxy.get_update_schedules` + +```python +def get_update_schedules() -> dict +``` + +Get schedules for all dynamic updates. + +This method gets all scheduled running on a device. This includes the ones pushed from Panorama. + +The actual API command is `devices/entry/deviceconfig/system/update-schedule`. + +__Returns__ + + +`dict`: All dynamic updates schedules, key is the entity type to update, like: threats, wildfire, etc. + +```python showLineNumbers title="Sample output" +{'threats': {'recurring': {'weekly': {'action': 'download-only', + 'at': '01:02', + 'day-of-week': 'wednesday'}}}, +'wildfire': {'recurring': {'real-time': None}}} +``` + diff --git a/examples/low_level_methods/run_low_level_methods.py b/examples/low_level_methods/run_low_level_methods.py index 2cfe1d0..23418f4 100755 --- a/examples/low_level_methods/run_low_level_methods.py +++ b/examples/low_level_methods/run_low_level_methods.py @@ -117,4 +117,8 @@ print(f"\n certificates: {firewall.get_certificates()}") + print(f"\n jobs: {firewall.get_jobs()}") + + print(f"\n dynamic schedules: {firewall.get_update_schedules()}") + print() diff --git a/panos_upgrade_assurance/firewall_proxy.py b/panos_upgrade_assurance/firewall_proxy.py index 2bf4880..484ec60 100644 --- a/panos_upgrade_assurance/firewall_proxy.py +++ b/panos_upgrade_assurance/firewall_proxy.py @@ -1093,3 +1093,101 @@ def get_certificates(self) -> dict: result[cert_name] = certificate return result + + def get_jobs(self) -> dict: + """Get details on all jobs. + + This method retrieves all jobs and their details, this means running, pending, finished, etc. + + The actual API command is `show jobs all`. + + # Returns + + dict: All jobs found on the device, indexed by the ID of a job. + + ```python showLineNumbers title="Sample output" + {'1': {'description': None, + 'details': {'line': ['ID population failed', + 'Client logrcvr registered in the middle of a ' + 'commit/validate. Aborting current ' + 'commit/validate.', + 'Commit failed', + 'Failed to commit policy to device']}, + 'positionInQ': '0', + 'progress': '100', + 'queued': 'NO', + 'result': 'FAIL', + 'status': 'FIN', + 'stoppable': 'no', + 'tdeq': '00:28:32', + 'tenq': '2023/08/01 00:28:32', + 'tfin': '2023/08/01 00:28:36', + 'type': 'AutoCom', + 'user': None, + 'warnings': None}, + '2': {'description': None, + 'details': {'line': ['Configuration committed successfully', + 'Successfully committed last configuration']}, + 'positionInQ': '0', + 'progress': '100', + 'queued': 'NO', + 'result': 'OK', + 'status': 'FIN', + 'stoppable': 'no', + 'tdeq': '00:28:40', + 'tenq': '2023/08/01 00:28:40', + 'tfin': '2023/08/01 00:29:20', + 'type': 'AutoCom', + 'user': None, + 'warnings': None}, + '3': {'description': None, + 'details': None, + 'positionInQ': '0', + 'progress': '30', + 'queued': 'NO', + 'result': 'PEND', + 'status': 'ACT', + 'stoppable': 'yes', + 'tdeq': '00:58:59', + 'tenq': '2023/08/01 00:58:59', + 'tfin': None, + 'type': 'Downld', + 'user': None, + 'warnings': None}} + ``` + + """ + jobs = self.op_parser(cmd="show jobs all") + results = dict() + + for job in jobs["job"]: + jid = job["id"] + job.pop("id") + results[jid] = job + + return results + + def get_update_schedules(self) -> dict: + """Get schedules for all dynamic updates. + + This method gets all scheduled running on a device. This includes the ones pushed from Panorama. + + The actual API command is `devices/entry/deviceconfig/system/update-schedule`. + + # Returns + + dict: All dynamic updates schedules, key is the entity type to update, like: threats, wildfire, etc. + + ```python showLineNumbers title="Sample output" + {'threats': {'recurring': {'weekly': {'action': 'download-only', + 'at': '01:02', + 'day-of-week': 'wednesday'}}}, + 'wildfire': {'recurring': {'real-time': None}}} + ``` + + """ + schedules = self.op_parser( + cmd="devices/entry/deviceconfig/system/update-schedule", + cmd_in_xml=True) + + return schedules["update-schedule"] From 5f64be25003e02997fd4c4e86dba88092e152358 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Wed, 2 Aug 2023 11:47:15 +0200 Subject: [PATCH 02/13] wrong branch code --- .../api/check_firewall.md | 34 ++++++ .../run_low_level_methods.py | 48 ++++---- .../readiness_checks/run_readiness_checks.py | 109 ++---------------- panos_upgrade_assurance/check_firewall.py | 109 ++++++++++++++++++ panos_upgrade_assurance/utils.py | 5 + 5 files changed, 179 insertions(+), 126 deletions(-) diff --git a/docs/panos-upgrade-assurance/api/check_firewall.md b/docs/panos-upgrade-assurance/api/check_firewall.md index 1ef0c0f..fed8e6e 100644 --- a/docs/panos-upgrade-assurance/api/check_firewall.md +++ b/docs/panos-upgrade-assurance/api/check_firewall.md @@ -459,6 +459,40 @@ __Returns__ * [`CheckStatus.ERROR`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when the certificate's properties (installed or required) are not supported. +### `CheckFirewall.check_non_finished_jobs` + +```python +def check_non_finished_jobs() -> CheckResult +``` + +Check for any job with status different than FIN. + +__Returns__ + + +`CheckResult`: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking value of: + +* [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when all jobs are in FIN state. +* [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` + field contains information about the 1st job found with status different than FIN (job ID and the actual + status). + +### `CheckFirewall._calculate_time_distance` + +```python +def _calculate_time_distance(schedule_type: str, schedule: dict) -> (int, str) +``` + + + +### `CheckFirewall.check_scheduled_updates` + +```python +def check_scheduled_updates(test_window: int = 60) -> CheckResult +``` + + + ### `CheckFirewall.get_content_db_version` ```python diff --git a/examples/low_level_methods/run_low_level_methods.py b/examples/low_level_methods/run_low_level_methods.py index 23418f4..3a17946 100755 --- a/examples/low_level_methods/run_low_level_methods.py +++ b/examples/low_level_methods/run_low_level_methods.py @@ -74,50 +74,50 @@ hostname=address, api_password=password, api_username=username, vsys=vsys ) - p_config = firewall.is_panorama_configured() - print(f"\n panorama configured: {p_config}") + # p_config = firewall.is_panorama_configured() + # print(f"\n panorama configured: {p_config}") - if p_config: - print(f"\n panorama connected: {firewall.is_panorama_connected()}") + # if p_config: + # print(f"\n panorama connected: {firewall.is_panorama_connected()}") - print(f"\n pending changed: {firewall.is_pending_changes()}") - print(f"\n full commit pending: {firewall.is_full_commit_required()}") + # print(f"\n pending changed: {firewall.is_pending_changes()}") + # print(f"\n full commit pending: {firewall.is_full_commit_required()}") - print(f"\n ha configuration\n{firewall.get_ha_configuration()}") + # print(f"\n ha configuration\n{firewall.get_ha_configuration()}") - print(f"\n nic statuses\n{firewall.get_nics()}") + # print(f"\n nic statuses\n{firewall.get_nics()}") - print(f"\n licenses information\n{firewall.get_licenses()}") + # print(f"\n licenses information\n{firewall.get_licenses()}") - print(f"\n support license information\n{firewall.get_support_license()}") + # print(f"\n support license information\n{firewall.get_support_license()}") - print(f"\n routes information\n{firewall.get_routes()}") + # print(f"\n routes information\n{firewall.get_routes()}") - print(f"\n arp entries information\n{firewall.get_arp_table()}") + # print(f"\n arp entries information\n{firewall.get_arp_table()}") - print(f"\n session information\n{firewall.get_session_stats()}") + # print(f"\n session information\n{firewall.get_session_stats()}") - print(f"\n session information\n{firewall.get_sessions()}") + # print(f"\n session information\n{firewall.get_sessions()}") - print(f"\n tunnels information\n{firewall.get_tunnels()}") + # print(f"\n tunnels information\n{firewall.get_tunnels()}") - print(f"\n NTP SRVs information\n{firewall.get_ntp_servers()}") + # print(f"\n NTP SRVs information\n{firewall.get_ntp_servers()}") - print(f"\n content DB version: {firewall.get_content_db_version()}") + # print(f"\n content DB version: {firewall.get_content_db_version()}") - print(f"\n latest availble content DB version: {firewall.get_latest_available_content_version()}") + # print(f"\n latest availble content DB version: {firewall.get_latest_available_content_version()}") - print(f"\n disk utilization: {firewall.get_disk_utilization()}") + # print(f"\n disk utilization: {firewall.get_disk_utilization()}") - print(f"\n available image versions: {firewall.get_available_image_data()}") + # print(f"\n available image versions: {firewall.get_available_image_data()}") - print(f"\n management plane clock: {firewall.get_mp_clock()}") + # print(f"\n management plane clock: {firewall.get_mp_clock()}") - print(f"\n data plane clock: {firewall.get_dp_clock()}") + # print(f"\n data plane clock: {firewall.get_dp_clock()}") - print(f"\n certificates: {firewall.get_certificates()}") + # print(f"\n certificates: {firewall.get_certificates()}") - print(f"\n jobs: {firewall.get_jobs()}") + # print(f"\n jobs: {firewall.get_jobs()}") print(f"\n dynamic schedules: {firewall.get_update_schedules()}") diff --git a/examples/readiness_checks/run_readiness_checks.py b/examples/readiness_checks/run_readiness_checks.py index b0eb145..408251f 100755 --- a/examples/readiness_checks/run_readiness_checks.py +++ b/examples/readiness_checks/run_readiness_checks.py @@ -8,109 +8,19 @@ from getpass import getpass if __name__ == "__main__": - argparser = ArgumentParser( - add_help=True, - description="A simple script running upgrade assurance checks on device.", - ) - argparser.add_argument( - "-d, --device", - type=str, - dest="device", - metavar="ADDRESS", - help="Device (Panorama or Firewall) address", - required=True, - ) - argparser.add_argument( - "-u, --username", - type=str, - dest="username", - metavar="USER", - help="Username used to connect to a device", - required=True, - ) - argparser.add_argument( - "-p, --password", - type=str, - dest="password", - metavar="PASS", - help="Password matching the account specified in --username", - default=None, - ) - argparser.add_argument( - "-s, --serial", - type=str, - dest="serial", - metavar="SERIAL", - help="Serial number of a device, used when --device is pointing to a Panorama", - default=None, - ) - argparser.add_argument( - "-v, --vsys", - type=str, - dest="vsys", - metavar="VSYS", - help="Name of a VSYS to connect to", - default=None, + address = "13.74.81.226" + password = "pek_vrZsxl2emqud" + firewall = FirewallProxy( + hostname=address, api_password=password, api_username="panadmin" ) - args = argparser.parse_args() - - address = args.device - username = args.username - password = args.password - if not password: - password = getpass(f"{username} password: ") - - serial = args.serial - vsys = args.vsys - - if serial: - panorama = Panorama( - hostname=address, api_password=password, api_username=username - ) - firewall = FirewallProxy(serial=serial) - panorama.add(firewall) - else: - firewall = FirewallProxy( - hostname=address, api_password=password, api_username=username, vsys=vsys - ) - check_node = CheckFirewall(firewall) checks = [ - # "all", - # "panorama", - # "ntp_sync", - # "candidate_config", - # "active_support", - # # checks below have optional configuration - # {"ha": {"skip_config_sync": True, "ignore_non_functional": True}}, - # {"content_version": {"version": "8635-7675"}}, - # {"expired_licenses": {"skip_licenses": ["Threat Prevention"]}}, - # {"planes_clock_sync": {"diff_threshold": 2}}, - # {"free_disk_space": {"image_version": "10.1.6-h6"}}, - { - "certificates_requirements": { - "ecdsa": { - "hash_method": "sha512", - }, - "rsa": { - "hash_method": "sha1", - "key_size": 4098, - } - } - }, - # checks below require additional configuration - # { - # "session_exist": { - # "source": "134.238.135.137", - # "destination": "10.1.0.4", - # "dest_port": "80", - # } - # }, - # {"arp_entry_exist": {"ip": "10.0.1.1"}}, - # {"ip_sec_tunnel_status": {"tunnel_name": "ipsec_tun"}}, + {"dynamic_updates": { + "test_window": 30 + }} ] check_readiness = check_node.run_readiness_checks( @@ -118,8 +28,3 @@ # report_style=True ) printer(check_readiness) - node_state = check_node.check_is_ha_active( - skip_config_sync=True, - ignore_non_functional=True - ) - print(bool(node_state), node_state) diff --git a/panos_upgrade_assurance/check_firewall.py b/panos_upgrade_assurance/check_firewall.py index b28ac06..5c7044d 100644 --- a/panos_upgrade_assurance/check_firewall.py +++ b/panos_upgrade_assurance/check_firewall.py @@ -11,6 +11,7 @@ SnapType, CheckStatus, SupportedHashes, + distance_in_minutes, ) from panos_upgrade_assurance.firewall_proxy import FirewallProxy from panos_upgrade_assurance import exceptions @@ -94,12 +95,16 @@ def __init__(self, node: FirewallProxy, skip_force_locale: Optional[bool] = Fals CheckType.FREE_DISK_SPACE: self.check_free_disk_space, CheckType.MP_DP_CLOCK_SYNC: self.check_mp_dp_sync, CheckType.CERTS: self.check_ssl_cert_requirements, + CheckType.JOBS: self.check_non_finished_jobs, + CheckType.UPDATES: self.check_scheduled_updates, } if not skip_force_locale: locale.setlocale( locale.LC_ALL, "en_US.UTF-8" ) # force locale for datetime string parsing when non-English locale is set on host + self._now = datetime.now() + def check_pending_changes(self) -> CheckResult: """Check if there are pending changes on device. @@ -870,6 +875,110 @@ def check_ssl_cert_requirements(self, rsa: dict = {}, ecdsa: dict = {}) -> Check result.status = CheckStatus.SUCCESS return result + def check_non_finished_jobs(self) -> CheckResult: + """Check for any job with status different than FIN. + + # Returns + + CheckResult: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking \ + value of: + + * [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when all jobs are in FIN state. + * [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` + field contains information about the 1st job found with status different than FIN (job ID and the actual + status). + + """ + result = CheckResult() + + all_jobs = self._node.get_jobs() + + for jid, job in all_jobs.items(): + if job["status"] != "FIN": + result.reason = f"At least one job (ID={jid}) is not in finished state (state={job['status']})." + return result + + result.status = CheckStatus.SUCCESS + return result + + def _calculate_time_distance(self, schedule_type: str, schedule: dict) -> (int, str): + """ + """ + time_distance = 0 + details = "unsupported schedule type" + + if schedule_type == 'none': + time_distance = -1 + details = "disabled" + elif schedule_type == 'daily': + pass + elif schedule_type == 'hourly': + time_distance = 60 + details = 'every hour' + elif schedule_type == 'weekly': + pass + elif schedule_type.split('-')[0] == 'every': + if schedule_type.split('-')[1] == 'min': + time_distance = 1 + details = 'every minute' + elif schedule_type.split('-')[1] == 'hour': + time_distance = 60 + details = 'hourly' + elif schedule_type.split('-')[1].isnumeric(): + time_distance = int(schedule_type.split('-')[1]) + details = f'every {time_distance} minutes' + else: + raise exceptions.MalformedResponseException(f'Unknown schedule type: {schedule_type}.') + elif schedule_type == 'real-time': + details = 'unpredictable (real-time)' + else: + raise exceptions.MalformedResponseException(f'Unknown schedule type: {schedule_type}.') + + return time_distance, details + + def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: + """ + """ + if not isinstance(test_window, int): + raise exceptions.WrongDataTypeException(f"The test_windows parameter should be of type , got {type(test_window)} instead.") + + result = CheckResult() + + schedules = self._node.get_update_schedules() + if not schedules: + result.status = CheckStatus.SKIPPED + result.reason = "No scheduled job present on the device." + return result + + if test_window < 60: + result.status = CheckStatus.ERROR + result.reason = "Schedules test window is below the supported, safe minimum of 60 minutes." + return result + + for name, schedule in schedules.items(): + if "recurring" not in schedule.keys(): + raise exceptions.MalformedResponseException(f"Schedule {name} has malformed configuration, missing a schedule..") + if len(schedule) != 1: + raise exceptions.MalformedResponseException(f"Schedule {name} has malformed configuration: {schedule}") + + schedules_in_window = [] + for name, schedule in schedules.items(): + schedule_details = schedule['recurring'] + time_distance, details = self._calculate_time_distance( + schedule_type=schedule_details.keys()[0], + schedule=schedule_details.values()[0] + ) + if time_distance >= 0 and time_distance <= test_window: + schedules_in_window.append(f"{name}: {details}") + + if schedules_in_window: + result.status = f"Following schedules fall into test window: {', '.join(schedules_in_window)}." + return result + + result.status = CheckStatus.SUCCESS + + return result + def get_content_db_version(self) -> Dict[str, str]: """Get Content DB version. diff --git a/panos_upgrade_assurance/utils.py b/panos_upgrade_assurance/utils.py index 0e39da9..73e5840 100644 --- a/panos_upgrade_assurance/utils.py +++ b/panos_upgrade_assurance/utils.py @@ -28,6 +28,8 @@ class CheckType: FREE_DISK_SPACE = "free_disk_space" MP_DP_CLOCK_SYNC = "planes_clock_sync" CERTS = "certificates_requirements" + JOBS = "jobs" + UPDATES = "dynamic_updates" class SnapType: @@ -334,3 +336,6 @@ def printer(report: dict, indent_level: int = 0) -> None: # pragma: no cover - printer(v, indent_level + 1) else: print(f"{delim * indent_level} {k}: {v}") + +def distance_in_minutes(**kwargs) -> int: + pass \ No newline at end of file From a2dd1ad3b942f03da5a947a4aa91530ea95572d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Thu, 3 Aug 2023 15:27:18 +0200 Subject: [PATCH 03/13] working code --- .../api/firewall_proxy.md | 2 +- .../readiness_checks/run_readiness_checks.py | 7 +- panos_upgrade_assurance/check_firewall.py | 74 ++++++++++++------- panos_upgrade_assurance/firewall_proxy.py | 47 +++++++----- 4 files changed, 82 insertions(+), 48 deletions(-) diff --git a/docs/panos-upgrade-assurance/api/firewall_proxy.md b/docs/panos-upgrade-assurance/api/firewall_proxy.md index 0bd42c7..58322d0 100644 --- a/docs/panos-upgrade-assurance/api/firewall_proxy.md +++ b/docs/panos-upgrade-assurance/api/firewall_proxy.md @@ -854,7 +854,7 @@ __Returns__ ### `FirewallProxy.get_mp_clock` ```python -def get_mp_clock() -> dict +def get_mp_clock() -> datetime ``` Get the clock information from management plane. diff --git a/examples/readiness_checks/run_readiness_checks.py b/examples/readiness_checks/run_readiness_checks.py index 408251f..a270db8 100755 --- a/examples/readiness_checks/run_readiness_checks.py +++ b/examples/readiness_checks/run_readiness_checks.py @@ -6,6 +6,7 @@ from panos.panorama import Panorama from argparse import ArgumentParser from getpass import getpass +from pprint import pprint if __name__ == "__main__": @@ -18,11 +19,15 @@ check_node = CheckFirewall(firewall) checks = [ + # "planes_clock_sync", {"dynamic_updates": { - "test_window": 30 + "test_window": 500 }} ] + pprint(firewall.get_update_schedules()) + pprint(firewall.get_mp_clock()) + # exit() check_readiness = check_node.run_readiness_checks( checks_configuration=checks, # report_style=True diff --git a/panos_upgrade_assurance/check_firewall.py b/panos_upgrade_assurance/check_firewall.py index 5c7044d..b0d78b5 100644 --- a/panos_upgrade_assurance/check_firewall.py +++ b/panos_upgrade_assurance/check_firewall.py @@ -1,7 +1,8 @@ from typing import Optional, Union, List, Dict -from math import ceil -from datetime import datetime +from math import ceil, floor +from datetime import datetime, timedelta import locale +import time from panos_upgrade_assurance.utils import ( CheckResult, @@ -103,8 +104,6 @@ def __init__(self, node: FirewallProxy, skip_force_locale: Optional[bool] = Fals locale.LC_ALL, "en_US.UTF-8" ) # force locale for datetime string parsing when non-English locale is set on host - self._now = datetime.now() - def check_pending_changes(self) -> CheckResult: """Check if there are pending changes on device. @@ -737,16 +736,7 @@ def check_mp_dp_sync(self, diff_threshold: int = 0) -> CheckResult: mp_clock = self._node.get_mp_clock() dp_clock = self._node.get_dp_clock() - mp_dt = datetime.strptime( - f"{mp_clock['year']}-{mp_clock['month']}-{mp_clock['day']} {mp_clock['time']}", - "%Y-%b-%d %H:%M:%S", - ) - dp_dt = datetime.strptime( - f"{dp_clock['year']}-{dp_clock['month']}-{dp_clock['day']} {dp_clock['time']}", - "%Y-%b-%d %H:%M:%S", - ) - - time_fluctuation = abs((mp_dt - dp_dt).total_seconds()) + time_fluctuation = abs((mp_clock - dp_clock).total_seconds()) if time_fluctuation > diff_threshold: result.reason = f"The data plane clock and management clock are different by {time_fluctuation} seconds." else: @@ -907,16 +897,35 @@ def _calculate_time_distance(self, schedule_type: str, schedule: dict) -> (int, time_distance = 0 details = "unsupported schedule type" - if schedule_type == 'none': - time_distance = -1 - details = "disabled" - elif schedule_type == 'daily': - pass + if schedule_type == 'daily': + occurrence = schedule['at'] + next_occurrence = datetime.strptime(f"{str(self._mp_now.date())} {occurrence}", '%Y-%m-%d %H:%M') + + if self._mp_now > next_occurrence: + next_occurrence = next_occurrence + timedelta(days=1) + diff = next_occurrence - self._mp_now + time_distance = (floor(diff.total_seconds()/60)) + details = f"at {next_occurrence.time()}" + elif schedule_type == 'hourly': time_distance = 60 details = 'every hour' elif schedule_type == 'weekly': - pass + occurrence_time = schedule['at'] + occurrence_day = schedule['day-of-week'] + occurrence_wday = time.strptime(occurrence_day, "%A").tm_wday + now_wday = self._mp_now.weekday() + + diff_days = (0 if occurrence_wday >= now_wday else 7) + occurrence_wday - now_wday + next_occurrence_date = (self._mp_now + timedelta(days=diff_days)).date() + next_occurrence = datetime.strptime(f"{str(next_occurrence_date)} {occurrence_time}", '%Y-%m-%d %H:%M') + + if self._mp_now > next_occurrence: + next_occurrence = next_occurrence + timedelta(days=7) + diff = next_occurrence - self._mp_now + time_distance = (floor(diff.total_seconds()/60)) + details = f"in {str(diff).split('.')[0]}" + elif schedule_type.split('-')[0] == 'every': if schedule_type.split('-')[1] == 'min': time_distance = 1 @@ -944,6 +953,8 @@ def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: result = CheckResult() + self._mp_now = self._node.get_mp_clock() + schedules = self._node.get_update_schedules() if not schedules: result.status = CheckStatus.SKIPPED @@ -954,6 +965,10 @@ def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: result.status = CheckStatus.ERROR result.reason = "Schedules test window is below the supported, safe minimum of 60 minutes." return result + if test_window > 10080: + result.status = CheckStatus.ERROR + result.reason = "Schedules test window is set to over 1 week. This test will always fail." + return result for name, schedule in schedules.items(): if "recurring" not in schedule.keys(): @@ -964,15 +979,20 @@ def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: schedules_in_window = [] for name, schedule in schedules.items(): schedule_details = schedule['recurring'] - time_distance, details = self._calculate_time_distance( - schedule_type=schedule_details.keys()[0], - schedule=schedule_details.values()[0] - ) - if time_distance >= 0 and time_distance <= test_window: - schedules_in_window.append(f"{name}: {details}") + + if 'sync-to-peer' in schedule_details: + schedule_details.pop('sync-to-peer') + + if 'none' not in schedule_details: + time_distance, details = self._calculate_time_distance( + schedule_type=next(iter(schedule_details.keys())), + schedule=next(iter(schedule_details.values())) + ) + if time_distance <= test_window: + schedules_in_window.append(f"{name} ({details})") if schedules_in_window: - result.status = f"Following schedules fall into test window: {', '.join(schedules_in_window)}." + result.reason = f"Following schedules fall into test window: {', '.join(schedules_in_window)}." return result result.status = CheckStatus.SUCCESS diff --git a/panos_upgrade_assurance/firewall_proxy.py b/panos_upgrade_assurance/firewall_proxy.py index 484ec60..0de4b75 100644 --- a/panos_upgrade_assurance/firewall_proxy.py +++ b/panos_upgrade_assurance/firewall_proxy.py @@ -6,6 +6,7 @@ from pan.xapi import PanXapiError from panos_upgrade_assurance import exceptions from math import floor +from datetime import datetime class FirewallProxy(Firewall): @@ -963,7 +964,7 @@ def get_available_image_data(self) -> dict: return result - def get_mp_clock(self) -> dict: + def get_mp_clock(self) -> datetime: """Get the clock information from management plane. The actual API command is `show clock`. @@ -985,17 +986,21 @@ def get_mp_clock(self) -> dict: """ time_string = self.op_parser(cmd="show clock") - time_dict = time_string.split() - result = { - "time": time_dict[3], - "tz": time_dict[4], - "day": time_dict[2], - "month": time_dict[1], - "year": time_dict[5], - "day_of_week": time_dict[0], + time_parsed = time_string.split() + time_dict = { + "time": time_parsed[3], + "tz": time_parsed[4], + "day": time_parsed[2], + "month": time_parsed[1], + "year": time_parsed[5], + "day_of_week": time_parsed[0], } + dt = datetime.strptime( + f"{time_dict['year']}-{time_dict['month']}-{time_dict['day']} {time_dict['time']}", + "%Y-%b-%d %H:%M:%S", + ) - return result + return dt def get_dp_clock(self) -> dict: """Get the clock information from data plane. @@ -1020,17 +1025,21 @@ def get_dp_clock(self) -> dict: """ response = self.op_parser(cmd="show clock more") time_string = dict(response)["member"] - time_dict = time_string.split() - result = { - "time": time_dict[5], - "tz": time_dict[6], - "day": time_dict[4], - "month": time_dict[3], - "year": time_dict[7], - "day_of_week": time_dict[2], + time_parsed = time_string.split() + time_dict = { + "time": time_parsed[5], + "tz": time_parsed[6], + "day": time_parsed[4], + "month": time_parsed[3], + "year": time_parsed[7], + "day_of_week": time_parsed[2], } + dt = datetime.strptime( + f"{time_dict['year']}-{time_dict['month']}-{time_dict['day']} {time_dict['time']}", + "%Y-%b-%d %H:%M:%S", + ) - return result + return dt def get_certificates(self) -> dict: """Get information about certificates installed on a device. From 9c5cbc97fe6e32a33be3055f76e5d41496a26002 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Thu, 3 Aug 2023 16:21:21 +0200 Subject: [PATCH 04/13] working code with failing or missing tests --- Makefile | 2 +- coverage.xml | 926 ++++++++++++++++++ .../api/check_firewall.md | 47 + .../api/firewall_proxy.md | 26 +- .../run_low_level_methods.py | 48 +- .../readiness_checks/run_readiness_checks.py | 116 ++- panos_upgrade_assurance/check_firewall.py | 114 ++- panos_upgrade_assurance/firewall_proxy.py | 29 +- panos_upgrade_assurance/utils.py | 3 - 9 files changed, 1187 insertions(+), 124 deletions(-) create mode 100644 coverage.xml diff --git a/Makefile b/Makefile index 66b565c..dbe2e0e 100644 --- a/Makefile +++ b/Makefile @@ -29,4 +29,4 @@ check_line_length: done < "$$FILE"; \ done -all: lint format security test_coverage documentation +all: format lint security test_coverage documentation diff --git a/coverage.xml b/coverage.xml new file mode 100644 index 0000000..8a30cc9 --- /dev/null +++ b/coverage.xml @@ -0,0 +1,926 @@ + + + + + + /Users/lpawlega/Git/public_code/pan-os-upgrade-assurance/panos_upgrade_assurance + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/panos-upgrade-assurance/api/check_firewall.md b/docs/panos-upgrade-assurance/api/check_firewall.md index fed8e6e..2e7db37 100644 --- a/docs/panos-upgrade-assurance/api/check_firewall.md +++ b/docs/panos-upgrade-assurance/api/check_firewall.md @@ -483,7 +483,30 @@ __Returns__ def _calculate_time_distance(schedule_type: str, schedule: dict) -> (int, str) ``` +A method that calculates the time distance between two `datetime` objects. +:::note +This method is used only by [`CheckFirewall.check_scheduled_updates()`](/panos/docs/panos-upgrade-assurance/api/check_firewall#checkfirewallcheck_scheduled_updates) method and it expects some information +to be already available. +::: + +__Parameters__ + + +- __schedule_type__ (`str`): A schedule type returned by PanOS, can be one of: `every-*`, `hourly`, `daily`, `weekly`, + `real-time`. +- __schedule__ (`dict`): Value of the `recurring` key in the API response, see [`FirewallProxy.get_update_schedules()`](/panos/docs/panos-upgrade-assurance/api/firewall_proxy#firewallproxyget_update_schedules) + documentation for details. + +__Raises__ + + +- `MalformedResponseException`: Thrown then the `schedule_type` is not recognizable. + +__Returns__ + + +`tuple(int, str)`: A tuple containing the calculated time difference (in minutes) and human-readable description. ### `CheckFirewall.check_scheduled_updates` @@ -491,7 +514,31 @@ def _calculate_time_distance(schedule_type: str, schedule: dict) -> (int, str) def check_scheduled_updates(test_window: int = 60) -> CheckResult ``` +Check if any Dynamic Update job is scheduled to run within the specified time window. + +__Parameters__ + + +- __test_window__ (`int, optional`): (defaults to 60 minutes). A time window in minutes to look for an update job occurrence. + Has to be a value between `60` and `10080` (1 week equivalent). The time window is calculated based on the device's + local time (taken from the management plane). + +__Raises__ + + +- `MalformedResponseException`: Thrown in case API response does not meet expectations. +__Returns__ + + +`CheckResult`: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking value of: + +* [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when there is no update job + planned within the test time window. +* [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` + field contains information about the planned jobs with next occurrence time provided if possible. +* [`CheckStatus.ERROR`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when the `test_window` parameter + does not meet criteria. ### `CheckFirewall.get_content_db_version` diff --git a/docs/panos-upgrade-assurance/api/firewall_proxy.md b/docs/panos-upgrade-assurance/api/firewall_proxy.md index 58322d0..57321a5 100644 --- a/docs/panos-upgrade-assurance/api/firewall_proxy.md +++ b/docs/panos-upgrade-assurance/api/firewall_proxy.md @@ -864,18 +864,7 @@ The actual API command is `show clock`. __Returns__ -`dict`: The clock information represented as a dictionary. - -```python showLineNumbers title="Sample output" -{ - 'time': '00:41:36', - 'tz': 'PDT', - 'day': '19', - 'month': 'Apr', - 'year': '2023', - 'day_of_week': 'Wed' -} -``` +`datetime`: The clock information represented as a `datetime` object. ### `FirewallProxy.get_dp_clock` @@ -890,18 +879,7 @@ The actual API command is `show clock more`. __Returns__ -`dict`: The clock information represented as a dictionary. - -```python showLineNumbers title="Sample output" -{ - 'time': '00:41:36', - 'tz': 'PDT', - 'day': '19', - 'month': 'Apr', - 'year': '2023', - 'day_of_week': 'Wed' -} -``` +`datetime`: The clock information represented as a `datetime` object. ### `FirewallProxy.get_certificates` diff --git a/examples/low_level_methods/run_low_level_methods.py b/examples/low_level_methods/run_low_level_methods.py index 3a17946..23418f4 100755 --- a/examples/low_level_methods/run_low_level_methods.py +++ b/examples/low_level_methods/run_low_level_methods.py @@ -74,50 +74,50 @@ hostname=address, api_password=password, api_username=username, vsys=vsys ) - # p_config = firewall.is_panorama_configured() - # print(f"\n panorama configured: {p_config}") + p_config = firewall.is_panorama_configured() + print(f"\n panorama configured: {p_config}") - # if p_config: - # print(f"\n panorama connected: {firewall.is_panorama_connected()}") + if p_config: + print(f"\n panorama connected: {firewall.is_panorama_connected()}") - # print(f"\n pending changed: {firewall.is_pending_changes()}") - # print(f"\n full commit pending: {firewall.is_full_commit_required()}") + print(f"\n pending changed: {firewall.is_pending_changes()}") + print(f"\n full commit pending: {firewall.is_full_commit_required()}") - # print(f"\n ha configuration\n{firewall.get_ha_configuration()}") + print(f"\n ha configuration\n{firewall.get_ha_configuration()}") - # print(f"\n nic statuses\n{firewall.get_nics()}") + print(f"\n nic statuses\n{firewall.get_nics()}") - # print(f"\n licenses information\n{firewall.get_licenses()}") + print(f"\n licenses information\n{firewall.get_licenses()}") - # print(f"\n support license information\n{firewall.get_support_license()}") + print(f"\n support license information\n{firewall.get_support_license()}") - # print(f"\n routes information\n{firewall.get_routes()}") + print(f"\n routes information\n{firewall.get_routes()}") - # print(f"\n arp entries information\n{firewall.get_arp_table()}") + print(f"\n arp entries information\n{firewall.get_arp_table()}") - # print(f"\n session information\n{firewall.get_session_stats()}") + print(f"\n session information\n{firewall.get_session_stats()}") - # print(f"\n session information\n{firewall.get_sessions()}") + print(f"\n session information\n{firewall.get_sessions()}") - # print(f"\n tunnels information\n{firewall.get_tunnels()}") + print(f"\n tunnels information\n{firewall.get_tunnels()}") - # print(f"\n NTP SRVs information\n{firewall.get_ntp_servers()}") + print(f"\n NTP SRVs information\n{firewall.get_ntp_servers()}") - # print(f"\n content DB version: {firewall.get_content_db_version()}") + print(f"\n content DB version: {firewall.get_content_db_version()}") - # print(f"\n latest availble content DB version: {firewall.get_latest_available_content_version()}") + print(f"\n latest availble content DB version: {firewall.get_latest_available_content_version()}") - # print(f"\n disk utilization: {firewall.get_disk_utilization()}") + print(f"\n disk utilization: {firewall.get_disk_utilization()}") - # print(f"\n available image versions: {firewall.get_available_image_data()}") + print(f"\n available image versions: {firewall.get_available_image_data()}") - # print(f"\n management plane clock: {firewall.get_mp_clock()}") + print(f"\n management plane clock: {firewall.get_mp_clock()}") - # print(f"\n data plane clock: {firewall.get_dp_clock()}") + print(f"\n data plane clock: {firewall.get_dp_clock()}") - # print(f"\n certificates: {firewall.get_certificates()}") + print(f"\n certificates: {firewall.get_certificates()}") - # print(f"\n jobs: {firewall.get_jobs()}") + print(f"\n jobs: {firewall.get_jobs()}") print(f"\n dynamic schedules: {firewall.get_update_schedules()}") diff --git a/examples/readiness_checks/run_readiness_checks.py b/examples/readiness_checks/run_readiness_checks.py index a270db8..bf25dde 100755 --- a/examples/readiness_checks/run_readiness_checks.py +++ b/examples/readiness_checks/run_readiness_checks.py @@ -6,30 +6,122 @@ from panos.panorama import Panorama from argparse import ArgumentParser from getpass import getpass -from pprint import pprint if __name__ == "__main__": + argparser = ArgumentParser( + add_help=True, + description="A simple script running upgrade assurance checks on device.", + ) - address = "13.74.81.226" - password = "pek_vrZsxl2emqud" - firewall = FirewallProxy( - hostname=address, api_password=password, api_username="panadmin" + argparser.add_argument( + "-d, --device", + type=str, + dest="device", + metavar="ADDRESS", + help="Device (Panorama or Firewall) address", + required=True, + ) + argparser.add_argument( + "-u, --username", + type=str, + dest="username", + metavar="USER", + help="Username used to connect to a device", + required=True, + ) + argparser.add_argument( + "-p, --password", + type=str, + dest="password", + metavar="PASS", + help="Password matching the account specified in --username", + default=None, + ) + argparser.add_argument( + "-s, --serial", + type=str, + dest="serial", + metavar="SERIAL", + help="Serial number of a device, used when --device is pointing to a Panorama", + default=None, + ) + argparser.add_argument( + "-v, --vsys", + type=str, + dest="vsys", + metavar="VSYS", + help="Name of a VSYS to connect to", + default=None, ) + args = argparser.parse_args() + + address = args.device + username = args.username + password = args.password + if not password: + password = getpass(f"{username} password: ") + + serial = args.serial + vsys = args.vsys + + if serial: + panorama = Panorama( + hostname=address, api_password=password, api_username=username + ) + firewall = FirewallProxy(serial=serial) + panorama.add(firewall) + else: + firewall = FirewallProxy( + hostname=address, api_password=password, api_username=username, vsys=vsys + ) + check_node = CheckFirewall(firewall) checks = [ - # "planes_clock_sync", - {"dynamic_updates": { - "test_window": 500 - }} + "all", + "panorama", + "ntp_sync", + "candidate_config", + "check_non_finished_jobs", + "active_support", + # checks below have optional configuration + { + "certificates_requirements": { + "ecdsa": { + "hash_method": "sha512", + }, + "rsa": { + "hash_method": "sha1", + "key_size": 4098, + } + } + }, + {"content_version": {"version": "8635-7675"}}, + {"dynamic_updates": {"test_window": 500}}, + {"expired_licenses": {"skip_licenses": ["Threat Prevention"]}}, + {"ha": {"skip_config_sync": True, "ignore_non_functional": True}}, + {"free_disk_space": {"image_version": "10.1.6-h6"}}, + {"planes_clock_sync": {"diff_threshold": 2}}, + # checks below require additional configuration + { + "session_exist": { + "source": "134.238.135.137", + "destination": "10.1.0.4", + "dest_port": "80", + } + }, + {"arp_entry_exist": {"ip": "10.0.1.1"}}, + {"ip_sec_tunnel_status": {"tunnel_name": "ipsec_tun"}}, ] - pprint(firewall.get_update_schedules()) - pprint(firewall.get_mp_clock()) - # exit() check_readiness = check_node.run_readiness_checks( checks_configuration=checks, # report_style=True ) printer(check_readiness) + node_state = check_node.check_is_ha_active( + skip_config_sync=True, + ignore_non_functional=True + ) + print(bool(node_state), node_state) diff --git a/panos_upgrade_assurance/check_firewall.py b/panos_upgrade_assurance/check_firewall.py index b0d78b5..c68b7be 100644 --- a/panos_upgrade_assurance/check_firewall.py +++ b/panos_upgrade_assurance/check_firewall.py @@ -12,7 +12,6 @@ SnapType, CheckStatus, SupportedHashes, - distance_in_minutes, ) from panos_upgrade_assurance.firewall_proxy import FirewallProxy from panos_upgrade_assurance import exceptions @@ -892,64 +891,110 @@ def check_non_finished_jobs(self) -> CheckResult: return result def _calculate_time_distance(self, schedule_type: str, schedule: dict) -> (int, str): - """ + """A method that calculates the time distance between two `datetime` objects. + + :::note + This method is used only by [`CheckFirewall.check_scheduled_updates()`](/panos/docs/panos-upgrade-assurance/api/check_firewall#checkfirewallcheck_scheduled_updates) method and it expects some information + to be already available. + ::: + + # Parameters + + schedule_type (str): A schedule type returned by PanOS, can be one of: `every-*`, `hourly`, `daily`, `weekly`, + `real-time`. + schedule (dict): Value of the `recurring` key in the API response, see [`FirewallProxy.get_update_schedules()`](/panos/docs/panos-upgrade-assurance/api/firewall_proxy#firewallproxyget_update_schedules) + documentation for details. + + # Raises + + MalformedResponseException: Thrown then the `schedule_type` is not recognizable. + + # Returns + + tuple(int, str): A tuple containing the calculated time difference (in minutes) and human-readable description. + """ time_distance = 0 details = "unsupported schedule type" - if schedule_type == 'daily': - occurrence = schedule['at'] - next_occurrence = datetime.strptime(f"{str(self._mp_now.date())} {occurrence}", '%Y-%m-%d %H:%M') + if schedule_type == "daily": + occurrence = schedule["at"] + next_occurrence = datetime.strptime(f"{str(self._mp_now.date())} {occurrence}", "%Y-%m-%d %H:%M") if self._mp_now > next_occurrence: next_occurrence = next_occurrence + timedelta(days=1) diff = next_occurrence - self._mp_now - time_distance = (floor(diff.total_seconds()/60)) + time_distance = floor(diff.total_seconds() / 60) details = f"at {next_occurrence.time()}" - elif schedule_type == 'hourly': + elif schedule_type == "hourly": time_distance = 60 - details = 'every hour' - elif schedule_type == 'weekly': - occurrence_time = schedule['at'] - occurrence_day = schedule['day-of-week'] + details = "every hour" + elif schedule_type == "weekly": + occurrence_time = schedule["at"] + occurrence_day = schedule["day-of-week"] occurrence_wday = time.strptime(occurrence_day, "%A").tm_wday now_wday = self._mp_now.weekday() diff_days = (0 if occurrence_wday >= now_wday else 7) + occurrence_wday - now_wday next_occurrence_date = (self._mp_now + timedelta(days=diff_days)).date() - next_occurrence = datetime.strptime(f"{str(next_occurrence_date)} {occurrence_time}", '%Y-%m-%d %H:%M') + next_occurrence = datetime.strptime(f"{str(next_occurrence_date)} {occurrence_time}", "%Y-%m-%d %H:%M") if self._mp_now > next_occurrence: next_occurrence = next_occurrence + timedelta(days=7) diff = next_occurrence - self._mp_now - time_distance = (floor(diff.total_seconds()/60)) + time_distance = floor(diff.total_seconds() / 60) details = f"in {str(diff).split('.')[0]}" - - elif schedule_type.split('-')[0] == 'every': - if schedule_type.split('-')[1] == 'min': + + elif schedule_type.split("-")[0] == "every": + if schedule_type.split("-")[1] == "min": time_distance = 1 - details = 'every minute' - elif schedule_type.split('-')[1] == 'hour': + details = "every minute" + elif schedule_type.split("-")[1] == "hour": time_distance = 60 - details = 'hourly' - elif schedule_type.split('-')[1].isnumeric(): - time_distance = int(schedule_type.split('-')[1]) - details = f'every {time_distance} minutes' + details = "hourly" + elif schedule_type.split("-")[1].isnumeric(): + time_distance = int(schedule_type.split("-")[1]) + details = f"every {time_distance} minutes" else: - raise exceptions.MalformedResponseException(f'Unknown schedule type: {schedule_type}.') - elif schedule_type == 'real-time': - details = 'unpredictable (real-time)' + raise exceptions.MalformedResponseException(f"Unknown schedule type: {schedule_type}.") + elif schedule_type == "real-time": + details = "unpredictable (real-time)" else: - raise exceptions.MalformedResponseException(f'Unknown schedule type: {schedule_type}.') + raise exceptions.MalformedResponseException(f"Unknown schedule type: {schedule_type}.") return time_distance, details def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: - """ + """Check if any Dynamic Update job is scheduled to run within the specified time window. + + # Parameters + + test_window (int, optional): (defaults to 60 minutes). A time window in minutes to look for an update job occurrence. + Has to be a value between `60` and `10080` (1 week equivalent). The time window is calculated based on the device's + local time (taken from the management plane). + + # Raises + + MalformedResponseException: Thrown in case API response does not meet expectations. + + # Returns + + CheckResult: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking \ + value of: + + * [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when there is no update job + planned within the test time window. + * [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` + field contains information about the planned jobs with next occurrence time provided if possible. + * [`CheckStatus.ERROR`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when the `test_window` parameter + does not meet criteria. + """ if not isinstance(test_window, int): - raise exceptions.WrongDataTypeException(f"The test_windows parameter should be of type , got {type(test_window)} instead.") + raise exceptions.WrongDataTypeException( + f"The test_windows parameter should be of type , got {type(test_window)} instead." + ) result = CheckResult() @@ -978,16 +1023,15 @@ def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: schedules_in_window = [] for name, schedule in schedules.items(): - schedule_details = schedule['recurring'] + schedule_details = schedule["recurring"] - if 'sync-to-peer' in schedule_details: - schedule_details.pop('sync-to-peer') + if "sync-to-peer" in schedule_details: + schedule_details.pop("sync-to-peer") - if 'none' not in schedule_details: + if "none" not in schedule_details: time_distance, details = self._calculate_time_distance( - schedule_type=next(iter(schedule_details.keys())), - schedule=next(iter(schedule_details.values())) - ) + schedule_type=next(iter(schedule_details.keys())), schedule=next(iter(schedule_details.values())) + ) if time_distance <= test_window: schedules_in_window.append(f"{name} ({details})") diff --git a/panos_upgrade_assurance/firewall_proxy.py b/panos_upgrade_assurance/firewall_proxy.py index 0de4b75..688d9ff 100644 --- a/panos_upgrade_assurance/firewall_proxy.py +++ b/panos_upgrade_assurance/firewall_proxy.py @@ -971,18 +971,7 @@ def get_mp_clock(self) -> datetime: # Returns - dict: The clock information represented as a dictionary. - - ```python showLineNumbers title="Sample output" - { - 'time': '00:41:36', - 'tz': 'PDT', - 'day': '19', - 'month': 'Apr', - 'year': '2023', - 'day_of_week': 'Wed' - } - ``` + datetime: The clock information represented as a `datetime` object. """ time_string = self.op_parser(cmd="show clock") @@ -1009,18 +998,7 @@ def get_dp_clock(self) -> dict: # Returns - dict: The clock information represented as a dictionary. - - ```python showLineNumbers title="Sample output" - { - 'time': '00:41:36', - 'tz': 'PDT', - 'day': '19', - 'month': 'Apr', - 'year': '2023', - 'day_of_week': 'Wed' - } - ``` + datetime: The clock information represented as a `datetime` object. """ response = self.op_parser(cmd="show clock more") @@ -1197,6 +1175,7 @@ def get_update_schedules(self) -> dict: """ schedules = self.op_parser( cmd="devices/entry/deviceconfig/system/update-schedule", - cmd_in_xml=True) + cmd_in_xml=True, + ) return schedules["update-schedule"] diff --git a/panos_upgrade_assurance/utils.py b/panos_upgrade_assurance/utils.py index 73e5840..41d00d1 100644 --- a/panos_upgrade_assurance/utils.py +++ b/panos_upgrade_assurance/utils.py @@ -336,6 +336,3 @@ def printer(report: dict, indent_level: int = 0) -> None: # pragma: no cover - printer(v, indent_level + 1) else: print(f"{delim * indent_level} {k}: {v}") - -def distance_in_minutes(**kwargs) -> int: - pass \ No newline at end of file From 092a3451c232956565c2e4804738bb6273b0c54e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Fri, 4 Aug 2023 12:13:02 +0200 Subject: [PATCH 05/13] cleanup ignored files --- coverage.xml | 926 --------------------------------------------------- 1 file changed, 926 deletions(-) delete mode 100644 coverage.xml diff --git a/coverage.xml b/coverage.xml deleted file mode 100644 index 8a30cc9..0000000 --- a/coverage.xml +++ /dev/null @@ -1,926 +0,0 @@ - - - - - - /Users/lpawlega/Git/public_code/pan-os-upgrade-assurance/panos_upgrade_assurance - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - From e6eb1a9e3f0d9714d3e9d1574473cc3f5d3ad621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Thu, 10 Aug 2023 11:03:09 +0200 Subject: [PATCH 06/13] updating tests --- Makefile | 2 +- .../api/check_firewall.md | 22 +-- .../api/firewall_proxy.md | 68 -------- .../run_low_level_methods.py | 52 +++--- .../readiness_checks/run_readiness_checks.py | 1 - panos_upgrade_assurance/check_firewall.py | 50 ++---- panos_upgrade_assurance/firewall_proxy.py | 77 +-------- panos_upgrade_assurance/utils.py | 1 - tests/test_check_firewall.py | 158 ++++++++++++++---- tests/test_firewall_proxy.py | 85 ++++++++-- 10 files changed, 242 insertions(+), 274 deletions(-) diff --git a/Makefile b/Makefile index dbe2e0e..66b565c 100644 --- a/Makefile +++ b/Makefile @@ -29,4 +29,4 @@ check_line_length: done < "$$FILE"; \ done -all: format lint security test_coverage documentation +all: lint format security test_coverage documentation diff --git a/docs/panos-upgrade-assurance/api/check_firewall.md b/docs/panos-upgrade-assurance/api/check_firewall.md index 2e7db37..a3b62ff 100644 --- a/docs/panos-upgrade-assurance/api/check_firewall.md +++ b/docs/panos-upgrade-assurance/api/check_firewall.md @@ -459,28 +459,11 @@ __Returns__ * [`CheckStatus.ERROR`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when the certificate's properties (installed or required) are not supported. -### `CheckFirewall.check_non_finished_jobs` - -```python -def check_non_finished_jobs() -> CheckResult -``` - -Check for any job with status different than FIN. - -__Returns__ - - -`CheckResult`: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking value of: - -* [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when all jobs are in FIN state. -* [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` - field contains information about the 1st job found with status different than FIN (job ID and the actual - status). - ### `CheckFirewall._calculate_time_distance` ```python -def _calculate_time_distance(schedule_type: str, schedule: dict) -> (int, str) +def _calculate_time_distance(now_dt: datetime, schedule_type: str, + schedule: dict) -> (int, str) ``` A method that calculates the time distance between two `datetime` objects. @@ -493,6 +476,7 @@ to be already available. __Parameters__ +- __now_dt__ (`datetime`): A `datetime` object representing the current moment in time. - __schedule_type__ (`str`): A schedule type returned by PanOS, can be one of: `every-*`, `hourly`, `daily`, `weekly`, `real-time`. - __schedule__ (`dict`): Value of the `recurring` key in the API response, see [`FirewallProxy.get_update_schedules()`](/panos/docs/panos-upgrade-assurance/api/firewall_proxy#firewallproxyget_update_schedules) diff --git a/docs/panos-upgrade-assurance/api/firewall_proxy.md b/docs/panos-upgrade-assurance/api/firewall_proxy.md index 57321a5..3ab5d5f 100644 --- a/docs/panos-upgrade-assurance/api/firewall_proxy.md +++ b/docs/panos-upgrade-assurance/api/firewall_proxy.md @@ -934,74 +934,6 @@ __Returns__ } ``` -### `FirewallProxy.get_jobs` - -```python -def get_jobs() -> dict -``` - -Get details on all jobs. - -This method retrieves all jobs and their details, this means running, pending, finished, etc. - -The actual API command is `show jobs all`. - -__Returns__ - - -`dict`: All jobs found on the device, indexed by the ID of a job. - -```python showLineNumbers title="Sample output" -{'1': {'description': None, - 'details': {'line': ['ID population failed', - 'Client logrcvr registered in the middle of a ' - 'commit/validate. Aborting current ' - 'commit/validate.', - 'Commit failed', - 'Failed to commit policy to device']}, - 'positionInQ': '0', - 'progress': '100', - 'queued': 'NO', - 'result': 'FAIL', - 'status': 'FIN', - 'stoppable': 'no', - 'tdeq': '00:28:32', - 'tenq': '2023/08/01 00:28:32', - 'tfin': '2023/08/01 00:28:36', - 'type': 'AutoCom', - 'user': None, - 'warnings': None}, -'2': {'description': None, - 'details': {'line': ['Configuration committed successfully', - 'Successfully committed last configuration']}, - 'positionInQ': '0', - 'progress': '100', - 'queued': 'NO', - 'result': 'OK', - 'status': 'FIN', - 'stoppable': 'no', - 'tdeq': '00:28:40', - 'tenq': '2023/08/01 00:28:40', - 'tfin': '2023/08/01 00:29:20', - 'type': 'AutoCom', - 'user': None, - 'warnings': None}, -'3': {'description': None, - 'details': None, - 'positionInQ': '0', - 'progress': '30', - 'queued': 'NO', - 'result': 'PEND', - 'status': 'ACT', - 'stoppable': 'yes', - 'tdeq': '00:58:59', - 'tenq': '2023/08/01 00:58:59', - 'tfin': None, - 'type': 'Downld', - 'user': None, - 'warnings': None}} -``` - ### `FirewallProxy.get_update_schedules` ```python diff --git a/examples/low_level_methods/run_low_level_methods.py b/examples/low_level_methods/run_low_level_methods.py index 23418f4..d2305ad 100755 --- a/examples/low_level_methods/run_low_level_methods.py +++ b/examples/low_level_methods/run_low_level_methods.py @@ -4,6 +4,7 @@ from panos.panorama import Panorama from argparse import ArgumentParser from getpass import getpass +from pprint import pprint if __name__ == "__main__": argparser = ArgumentParser( @@ -74,51 +75,50 @@ hostname=address, api_password=password, api_username=username, vsys=vsys ) - p_config = firewall.is_panorama_configured() - print(f"\n panorama configured: {p_config}") + # p_config = firewall.is_panorama_configured() + # print(f"\n panorama configured: {p_config}") - if p_config: - print(f"\n panorama connected: {firewall.is_panorama_connected()}") + # if p_config: + # print(f"\n panorama connected: {firewall.is_panorama_connected()}") - print(f"\n pending changed: {firewall.is_pending_changes()}") - print(f"\n full commit pending: {firewall.is_full_commit_required()}") + # print(f"\n pending changed: {firewall.is_pending_changes()}") + # print(f"\n full commit pending: {firewall.is_full_commit_required()}") - print(f"\n ha configuration\n{firewall.get_ha_configuration()}") + # print(f"\n ha configuration\n{firewall.get_ha_configuration()}") - print(f"\n nic statuses\n{firewall.get_nics()}") + # print(f"\n nic statuses\n{firewall.get_nics()}") - print(f"\n licenses information\n{firewall.get_licenses()}") + # print(f"\n licenses information\n{firewall.get_licenses()}") - print(f"\n support license information\n{firewall.get_support_license()}") + # print(f"\n support license information\n{firewall.get_support_license()}") - print(f"\n routes information\n{firewall.get_routes()}") + # print(f"\n routes information\n{firewall.get_routes()}") - print(f"\n arp entries information\n{firewall.get_arp_table()}") + # print(f"\n arp entries information\n{firewall.get_arp_table()}") - print(f"\n session information\n{firewall.get_session_stats()}") + # print(f"\n session information\n{firewall.get_session_stats()}") - print(f"\n session information\n{firewall.get_sessions()}") + # print(f"\n session information\n{firewall.get_sessions()}") - print(f"\n tunnels information\n{firewall.get_tunnels()}") + # print(f"\n tunnels information\n{firewall.get_tunnels()}") - print(f"\n NTP SRVs information\n{firewall.get_ntp_servers()}") + # print(f"\n NTP SRVs information\n{firewall.get_ntp_servers()}") - print(f"\n content DB version: {firewall.get_content_db_version()}") + # print(f"\n content DB version: {firewall.get_content_db_version()}") - print(f"\n latest availble content DB version: {firewall.get_latest_available_content_version()}") + # print(f"\n latest availble content DB version: {firewall.get_latest_available_content_version()}") - print(f"\n disk utilization: {firewall.get_disk_utilization()}") + # print(f"\n disk utilization: {firewall.get_disk_utilization()}") - print(f"\n available image versions: {firewall.get_available_image_data()}") + # print(f"\n available image versions: {firewall.get_available_image_data()}") - print(f"\n management plane clock: {firewall.get_mp_clock()}") + # print(f"\n management plane clock: {firewall.get_mp_clock()}") - print(f"\n data plane clock: {firewall.get_dp_clock()}") + # print(f"\n data plane clock: {firewall.get_dp_clock()}") - print(f"\n certificates: {firewall.get_certificates()}") + # print(f"\n certificates: {firewall.get_certificates()}") - print(f"\n jobs: {firewall.get_jobs()}") - - print(f"\n dynamic schedules: {firewall.get_update_schedules()}") + # print(f"\n dynamic schedules: {firewall.get_update_schedules()}") + pprint(firewall.get_update_schedules()) print() diff --git a/examples/readiness_checks/run_readiness_checks.py b/examples/readiness_checks/run_readiness_checks.py index bf25dde..944c62d 100755 --- a/examples/readiness_checks/run_readiness_checks.py +++ b/examples/readiness_checks/run_readiness_checks.py @@ -83,7 +83,6 @@ "panorama", "ntp_sync", "candidate_config", - "check_non_finished_jobs", "active_support", # checks below have optional configuration { diff --git a/panos_upgrade_assurance/check_firewall.py b/panos_upgrade_assurance/check_firewall.py index c68b7be..e44686e 100644 --- a/panos_upgrade_assurance/check_firewall.py +++ b/panos_upgrade_assurance/check_firewall.py @@ -95,7 +95,6 @@ def __init__(self, node: FirewallProxy, skip_force_locale: Optional[bool] = Fals CheckType.FREE_DISK_SPACE: self.check_free_disk_space, CheckType.MP_DP_CLOCK_SYNC: self.check_mp_dp_sync, CheckType.CERTS: self.check_ssl_cert_requirements, - CheckType.JOBS: self.check_non_finished_jobs, CheckType.UPDATES: self.check_scheduled_updates, } if not skip_force_locale: @@ -864,33 +863,7 @@ def check_ssl_cert_requirements(self, rsa: dict = {}, ecdsa: dict = {}) -> Check result.status = CheckStatus.SUCCESS return result - def check_non_finished_jobs(self) -> CheckResult: - """Check for any job with status different than FIN. - - # Returns - - CheckResult: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking \ - value of: - - * [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when all jobs are in FIN state. - * [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` - field contains information about the 1st job found with status different than FIN (job ID and the actual - status). - - """ - result = CheckResult() - - all_jobs = self._node.get_jobs() - - for jid, job in all_jobs.items(): - if job["status"] != "FIN": - result.reason = f"At least one job (ID={jid}) is not in finished state (state={job['status']})." - return result - - result.status = CheckStatus.SUCCESS - return result - - def _calculate_time_distance(self, schedule_type: str, schedule: dict) -> (int, str): + def _calculate_time_distance(self, now_dt: datetime, schedule_type: str, schedule: dict) -> (int, str): """A method that calculates the time distance between two `datetime` objects. :::note @@ -900,6 +873,7 @@ def _calculate_time_distance(self, schedule_type: str, schedule: dict) -> (int, # Parameters + now_dt (datetime): A `datetime` object representing the current moment in time. schedule_type (str): A schedule type returned by PanOS, can be one of: `every-*`, `hourly`, `daily`, `weekly`, `real-time`. schedule (dict): Value of the `recurring` key in the API response, see [`FirewallProxy.get_update_schedules()`](/panos/docs/panos-upgrade-assurance/api/firewall_proxy#firewallproxyget_update_schedules) @@ -919,11 +893,11 @@ def _calculate_time_distance(self, schedule_type: str, schedule: dict) -> (int, if schedule_type == "daily": occurrence = schedule["at"] - next_occurrence = datetime.strptime(f"{str(self._mp_now.date())} {occurrence}", "%Y-%m-%d %H:%M") + next_occurrence = datetime.strptime(f"{str(now_dt.date())} {occurrence}", "%Y-%m-%d %H:%M") - if self._mp_now > next_occurrence: + if now_dt > next_occurrence: next_occurrence = next_occurrence + timedelta(days=1) - diff = next_occurrence - self._mp_now + diff = next_occurrence - now_dt time_distance = floor(diff.total_seconds() / 60) details = f"at {next_occurrence.time()}" @@ -934,15 +908,15 @@ def _calculate_time_distance(self, schedule_type: str, schedule: dict) -> (int, occurrence_time = schedule["at"] occurrence_day = schedule["day-of-week"] occurrence_wday = time.strptime(occurrence_day, "%A").tm_wday - now_wday = self._mp_now.weekday() + now_wday = now_dt.weekday() diff_days = (0 if occurrence_wday >= now_wday else 7) + occurrence_wday - now_wday - next_occurrence_date = (self._mp_now + timedelta(days=diff_days)).date() + next_occurrence_date = (now_dt + timedelta(days=diff_days)).date() next_occurrence = datetime.strptime(f"{str(next_occurrence_date)} {occurrence_time}", "%Y-%m-%d %H:%M") - if self._mp_now > next_occurrence: + if now_dt > next_occurrence: next_occurrence = next_occurrence + timedelta(days=7) - diff = next_occurrence - self._mp_now + diff = next_occurrence - now_dt time_distance = floor(diff.total_seconds() / 60) details = f"in {str(diff).split('.')[0]}" @@ -998,7 +972,7 @@ def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: result = CheckResult() - self._mp_now = self._node.get_mp_clock() + mp_now = self._node.get_mp_clock() schedules = self._node.get_update_schedules() if not schedules: @@ -1030,7 +1004,9 @@ def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: if "none" not in schedule_details: time_distance, details = self._calculate_time_distance( - schedule_type=next(iter(schedule_details.keys())), schedule=next(iter(schedule_details.values())) + now_dt=mp_now, + schedule_type=next(iter(schedule_details.keys())), + schedule=next(iter(schedule_details.values())), ) if time_distance <= test_window: schedules_in_window.append(f"{name} ({details})") diff --git a/panos_upgrade_assurance/firewall_proxy.py b/panos_upgrade_assurance/firewall_proxy.py index 688d9ff..59bf5d0 100644 --- a/panos_upgrade_assurance/firewall_proxy.py +++ b/panos_upgrade_assurance/firewall_proxy.py @@ -1081,79 +1081,6 @@ def get_certificates(self) -> dict: return result - def get_jobs(self) -> dict: - """Get details on all jobs. - - This method retrieves all jobs and their details, this means running, pending, finished, etc. - - The actual API command is `show jobs all`. - - # Returns - - dict: All jobs found on the device, indexed by the ID of a job. - - ```python showLineNumbers title="Sample output" - {'1': {'description': None, - 'details': {'line': ['ID population failed', - 'Client logrcvr registered in the middle of a ' - 'commit/validate. Aborting current ' - 'commit/validate.', - 'Commit failed', - 'Failed to commit policy to device']}, - 'positionInQ': '0', - 'progress': '100', - 'queued': 'NO', - 'result': 'FAIL', - 'status': 'FIN', - 'stoppable': 'no', - 'tdeq': '00:28:32', - 'tenq': '2023/08/01 00:28:32', - 'tfin': '2023/08/01 00:28:36', - 'type': 'AutoCom', - 'user': None, - 'warnings': None}, - '2': {'description': None, - 'details': {'line': ['Configuration committed successfully', - 'Successfully committed last configuration']}, - 'positionInQ': '0', - 'progress': '100', - 'queued': 'NO', - 'result': 'OK', - 'status': 'FIN', - 'stoppable': 'no', - 'tdeq': '00:28:40', - 'tenq': '2023/08/01 00:28:40', - 'tfin': '2023/08/01 00:29:20', - 'type': 'AutoCom', - 'user': None, - 'warnings': None}, - '3': {'description': None, - 'details': None, - 'positionInQ': '0', - 'progress': '30', - 'queued': 'NO', - 'result': 'PEND', - 'status': 'ACT', - 'stoppable': 'yes', - 'tdeq': '00:58:59', - 'tenq': '2023/08/01 00:58:59', - 'tfin': None, - 'type': 'Downld', - 'user': None, - 'warnings': None}} - ``` - - """ - jobs = self.op_parser(cmd="show jobs all") - results = dict() - - for job in jobs["job"]: - jid = job["id"] - job.pop("id") - results[jid] = job - - return results - def get_update_schedules(self) -> dict: """Get schedules for all dynamic updates. @@ -1177,5 +1104,9 @@ def get_update_schedules(self) -> dict: cmd="devices/entry/deviceconfig/system/update-schedule", cmd_in_xml=True, ) + if schedules is None: + return {} + if "update-schedule" not in schedules: + return {} return schedules["update-schedule"] diff --git a/panos_upgrade_assurance/utils.py b/panos_upgrade_assurance/utils.py index 41d00d1..2d4d5fe 100644 --- a/panos_upgrade_assurance/utils.py +++ b/panos_upgrade_assurance/utils.py @@ -28,7 +28,6 @@ class CheckType: FREE_DISK_SPACE = "free_disk_space" MP_DP_CLOCK_SYNC = "planes_clock_sync" CERTS = "certificates_requirements" - JOBS = "jobs" UPDATES = "dynamic_updates" diff --git a/tests/test_check_firewall.py b/tests/test_check_firewall.py index 016aa13..0bc5d24 100644 --- a/tests/test_check_firewall.py +++ b/tests/test_check_firewall.py @@ -11,7 +11,9 @@ ContentDBVersionsFormatException, WrongDiskSizeFormatException, UnknownParameterException, + MalformedResponseException, ) +from datetime import datetime @pytest.fixture @@ -618,44 +620,34 @@ def test_check_mp_dp_sync_wrong_input_data(self, check_firewall_mock): assert str(exception_msg.value) == "[diff_threshold] should be of type [int] but is of type []." def test_check_mp_dp_sync_time_diff(self, check_firewall_mock): - check_firewall_mock._node.get_mp_clock.return_value = { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:50:21", - "tz": "PDT", - "year": "2023", - } - check_firewall_mock._node.get_dp_clock.return_value = { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:52:34", - "tz": "PDT", - "year": "2023", - } + check_firewall_mock._node.get_mp_clock.return_value = datetime.strptime( + "Wed May 31 11:50:21 2023", "%a %b %d %H:%M:%S %Y" + ) + check_firewall_mock._node.get_dp_clock.return_value = datetime.strptime( + "Wed May 31 11:52:34 2023", "%a %b %d %H:%M:%S %Y" + ) assert check_firewall_mock.check_mp_dp_sync(1) == CheckResult( status=CheckStatus.FAIL, reason="The data plane clock and management clock are different by 133.0 seconds." ) + def test_check_mp_dp_sync_time_diff_with_threshold(self, check_firewall_mock): + check_firewall_mock._node.get_mp_clock.return_value = datetime.strptime( + "Wed May 31 11:50:21 2023", "%a %b %d %H:%M:%S %Y" + ) + check_firewall_mock._node.get_dp_clock.return_value = datetime.strptime( + "Wed May 31 11:50:34 2023", "%a %b %d %H:%M:%S %Y" + ) + + assert check_firewall_mock.check_mp_dp_sync(30) == CheckResult(status=CheckStatus.SUCCESS) + def test_check_mp_dp_sync_time_synced(self, check_firewall_mock): - check_firewall_mock._node.get_mp_clock.return_value = { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:50:21", - "tz": "PDT", - "year": "2023", - } - check_firewall_mock._node.get_dp_clock.return_value = { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:50:21", - "tz": "PDT", - "year": "2023", - } + check_firewall_mock._node.get_mp_clock.return_value = datetime.strptime( + "Wed May 31 11:50:21 2023", "%a %b %d %H:%M:%S %Y" + ) + check_firewall_mock._node.get_dp_clock.return_value = datetime.strptime( + "Wed May 31 11:50:21 2023", "%a %b %d %H:%M:%S %Y" + ) assert check_firewall_mock.check_mp_dp_sync(1) == CheckResult(status=CheckStatus.SUCCESS) @@ -909,6 +901,108 @@ def test_check_ssl_cert_requirements_success(self, check_firewall_mock): assert check_firewall_mock.check_ssl_cert_requirements(rsa=rsa, ecdsa=ecdsa) == CheckResult(status=CheckStatus.SUCCESS) + @pytest.mark.parametrize( + "param_now_dts, param_schedule_type, param_schedule, param_time_d, param_details", + [ + ( + "2023-08-07 00:00:00", # this is Monday + "daily", + {"action": "download-and-install", "at": "07:45"}, + 465, + "at 07:45:00", + ), + ("2023-08-07 00:00:00", "hourly", {"action": "download-and-install", "at": "0"}, 60, "every hour"), # this is Monday + ( + "2023-08-07 00:00:00", # this is Monday + "every-5-mins", + {"action": "download-and-install", "at": "1"}, + 5, + "every 5 minutes", + ), + ("2023-08-07 00:00:00", "real-time", None, 0, "unpredictable (real-time)"), # this is Monday + ], + ) + def test__calculate_time_distance( + self, param_now_dts, param_schedule_type, param_schedule, param_time_d, param_details, check_firewall_mock + ): + mock_now_dt = datetime.strptime(param_now_dts, "%Y-%m-%d %H:%M:%S") + + time_delta, delta_reason = check_firewall_mock._calculate_time_distance(mock_now_dt, param_schedule_type, param_schedule) + + assert time_delta == param_time_d + assert delta_reason == param_details + + @pytest.mark.parametrize("param_schedule_type", ["every-something", "something"]) + def test__calculate_time_distance_exception(self, param_schedule_type, check_firewall_mock): + with pytest.raises(MalformedResponseException) as exception_msg: + check_firewall_mock._calculate_time_distance(datetime.now(), param_schedule_type, None) + + assert str(exception_msg.value) == f"Unknown schedule type: {param_schedule_type}." + + @pytest.mark.parametrize( + "param_now_dts, param_test_window, param_schedules_block, check_result", + [ + ( + "2023-08-07 00:00:00", # this is Monday + 120, + {"anti-virus": {"recurring": {"daily": {"action": "download-and-install", "at": "07:45"}}}}, + CheckResult(CheckStatus.SUCCESS, ""), + ), + ( + "2023-08-07 00:00:00", # this is Monday + 120, + {"anti-virus": {"recurring": {"real-time": None}}}, + CheckResult( + CheckStatus.FAIL, "Following schedules fall into test window: anti-virus (unpredictable (real-time))." + ), + ), + ( + "2023-08-07 07:00:00", # this is Monday + 60, + {"anti-virus": {"recurring": {"daily": {"action": "download-and-install", "at": "07:45"}}}}, + CheckResult(CheckStatus.FAIL, "Following schedules fall into test window: anti-virus (at 07:45:00)."), + ), + ( + "2023-08-07 00:00:00", # this is Monday + 180, + { + "global-protect-datafile": { + "recurring": {"weekly": {"action": "download-and-install", "at": "02:45", "day-of-week": "monday"}} + }, + "threats": {"recurring": {"daily": {"action": "download-and-install", "at": "15:30"}}}, + }, + CheckResult(CheckStatus.FAIL, "Following schedules fall into test window: global-protect-datafile (in 2:45:00)."), + ), + ( + "2023-08-07 07:00:00", # this is Monday + 20, + {}, + CheckResult(CheckStatus.SKIPPED, "No scheduled job present on the device."), + ), + ( + "2023-08-07 07:00:00", # this is Monday + 20, + {"anti-virus": {"recurring": {"real-time": None}}}, + CheckResult(CheckStatus.ERROR, "Schedules test window is below the supported, safe minimum of 60 minutes."), + ), + ( + "2023-08-07 07:00:00", # this is Monday + 10081, + {"anti-virus": {"recurring": {"real-time": None}}}, + CheckResult(CheckStatus.ERROR, "Schedules test window is set to over 1 week. This test will always fail."), + ), + ], + ) + def test_check_scheduled_updates( + self, param_now_dts, param_test_window, param_schedules_block, check_result, check_firewall_mock + ): + now_dt = datetime.strptime(param_now_dts, "%Y-%m-%d %H:%M:%S") + check_firewall_mock._node.get_mp_clock = lambda: now_dt + + check_firewall_mock._node.get_update_schedules = lambda: param_schedules_block + + assert check_firewall_mock.check_scheduled_updates(param_test_window) == check_result + def test_run_readiness_checks(self, check_firewall_mock): check_firewall_mock._check_method_mapping = { "check1": MagicMock(return_value=True), diff --git a/tests/test_firewall_proxy.py b/tests/test_firewall_proxy.py index 9a0f5fb..ba3565b 100644 --- a/tests/test_firewall_proxy.py +++ b/tests/test_firewall_proxy.py @@ -13,6 +13,7 @@ DeviceNotLicensedException, UpdateServerConnectivityException, ) +from datetime import datetime @pytest.fixture(scope="function") @@ -1021,14 +1022,9 @@ def test_get_mp_clock(self, fw_proxy_mock): raw_response = ET.fromstring(xml_text) fw_proxy_mock.op.return_value = raw_response - assert fw_proxy_mock.get_mp_clock() == { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:50:21", - "tz": "PDT", - "year": "2023", - } + response = datetime.strptime("Wed May 31 11:50:21 2023", "%a %b %d %H:%M:%S %Y") + + assert fw_proxy_mock.get_mp_clock() == response def test_get_dp_clock(self, fw_proxy_mock): xml_text = """ @@ -1041,14 +1037,9 @@ def test_get_dp_clock(self, fw_proxy_mock): raw_response = ET.fromstring(xml_text) fw_proxy_mock.op.return_value = raw_response - assert fw_proxy_mock.get_dp_clock() == { - "day": "31", - "day_of_week": "Wed", - "month": "May", - "time": "11:52:34", - "tz": "PDT", - "year": "2023", - } + response = datetime.strptime("Wed May 31 11:52:34 2023", "%a %b %d %H:%M:%S %Y") + + assert fw_proxy_mock.get_dp_clock() == response def test_get_certificates(self, fw_proxy_mock): xml_text = """ @@ -1143,3 +1134,65 @@ def test_get_certificates_no_certificate(self, fw_proxy_mock): fw_proxy_mock.op.return_value = raw_response assert fw_proxy_mock.get_certificates() == {} + + def test_get_update_schedules(self, fw_proxy_mock): + xml_text = """ + + + + + + yes + + 15:30 + download-and-install + + + + + + + 07:45 + download-and-install + + + + + + + """ + raw_response = ET.fromstring(xml_text) + fw_proxy_mock.op.return_value = raw_response + + response = { + "anti-virus": {"recurring": {"daily": {"action": "download-and-install", "at": "07:45"}}}, + "threats": {"recurring": {"daily": {"action": "download-and-install", "at": "15:30"}, "sync-to-peer": "yes"}}, + } + + assert fw_proxy_mock.get_update_schedules() == response + + def test_get_update_schedules_empty_response(self, fw_proxy_mock): + xml_text = """ + + + + + """ + raw_response = ET.fromstring(xml_text) + fw_proxy_mock.op.return_value = raw_response + + assert fw_proxy_mock.get_update_schedules() == {} + + def test_get_update_schedules_no_update_schedules_key(self, fw_proxy_mock): + xml_text = """ + + + + + + + """ + raw_response = ET.fromstring(xml_text) + fw_proxy_mock.op.return_value = raw_response + + assert fw_proxy_mock.get_update_schedules() == {} From 2eb499201ebf525fdf2ed98bb135c02b8b5c2779 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Thu, 10 Aug 2023 11:33:37 +0200 Subject: [PATCH 07/13] pr suggestions --- docs/panos-upgrade-assurance/api/check_firewall.md | 6 +++--- panos_upgrade_assurance/check_firewall.py | 9 ++++----- tests/test_check_firewall.py | 10 ++++++---- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/docs/panos-upgrade-assurance/api/check_firewall.md b/docs/panos-upgrade-assurance/api/check_firewall.md index a3b62ff..920bf47 100644 --- a/docs/panos-upgrade-assurance/api/check_firewall.md +++ b/docs/panos-upgrade-assurance/api/check_firewall.md @@ -459,11 +459,11 @@ __Returns__ * [`CheckStatus.ERROR`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when the certificate's properties (installed or required) are not supported. -### `CheckFirewall._calculate_time_distance` +### `CheckFirewall._calculate_schedule_time_diff` ```python -def _calculate_time_distance(now_dt: datetime, schedule_type: str, - schedule: dict) -> (int, str) +def _calculate_schedule_time_diff(now_dt: datetime, schedule_type: str, + schedule: dict) -> (int, str) ``` A method that calculates the time distance between two `datetime` objects. diff --git a/panos_upgrade_assurance/check_firewall.py b/panos_upgrade_assurance/check_firewall.py index e44686e..106a1cc 100644 --- a/panos_upgrade_assurance/check_firewall.py +++ b/panos_upgrade_assurance/check_firewall.py @@ -863,7 +863,7 @@ def check_ssl_cert_requirements(self, rsa: dict = {}, ecdsa: dict = {}) -> Check result.status = CheckStatus.SUCCESS return result - def _calculate_time_distance(self, now_dt: datetime, schedule_type: str, schedule: dict) -> (int, str): + def _calculate_schedule_time_diff(self, now_dt: datetime, schedule_type: str, schedule: dict) -> (int, str): """A method that calculates the time distance between two `datetime` objects. :::note @@ -926,7 +926,7 @@ def _calculate_time_distance(self, now_dt: datetime, schedule_type: str, schedul details = "every minute" elif schedule_type.split("-")[1] == "hour": time_distance = 60 - details = "hourly" + details = "every hour" elif schedule_type.split("-")[1].isnumeric(): time_distance = int(schedule_type.split("-")[1]) details = f"every {time_distance} minutes" @@ -989,21 +989,20 @@ def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: result.reason = "Schedules test window is set to over 1 week. This test will always fail." return result + schedules_in_window = [] for name, schedule in schedules.items(): if "recurring" not in schedule.keys(): raise exceptions.MalformedResponseException(f"Schedule {name} has malformed configuration, missing a schedule..") if len(schedule) != 1: raise exceptions.MalformedResponseException(f"Schedule {name} has malformed configuration: {schedule}") - schedules_in_window = [] - for name, schedule in schedules.items(): schedule_details = schedule["recurring"] if "sync-to-peer" in schedule_details: schedule_details.pop("sync-to-peer") if "none" not in schedule_details: - time_distance, details = self._calculate_time_distance( + time_distance, details = self._calculate_schedule_time_diff( now_dt=mp_now, schedule_type=next(iter(schedule_details.keys())), schedule=next(iter(schedule_details.values())), diff --git a/tests/test_check_firewall.py b/tests/test_check_firewall.py index 0bc5d24..f4b7412 100644 --- a/tests/test_check_firewall.py +++ b/tests/test_check_firewall.py @@ -922,20 +922,22 @@ def test_check_ssl_cert_requirements_success(self, check_firewall_mock): ("2023-08-07 00:00:00", "real-time", None, 0, "unpredictable (real-time)"), # this is Monday ], ) - def test__calculate_time_distance( + def test__calculate_schedule_time_diff( self, param_now_dts, param_schedule_type, param_schedule, param_time_d, param_details, check_firewall_mock ): mock_now_dt = datetime.strptime(param_now_dts, "%Y-%m-%d %H:%M:%S") - time_delta, delta_reason = check_firewall_mock._calculate_time_distance(mock_now_dt, param_schedule_type, param_schedule) + time_delta, delta_reason = check_firewall_mock._calculate_schedule_time_diff( + mock_now_dt, param_schedule_type, param_schedule + ) assert time_delta == param_time_d assert delta_reason == param_details @pytest.mark.parametrize("param_schedule_type", ["every-something", "something"]) - def test__calculate_time_distance_exception(self, param_schedule_type, check_firewall_mock): + def test__calculate_schedule_time_diff_exception(self, param_schedule_type, check_firewall_mock): with pytest.raises(MalformedResponseException) as exception_msg: - check_firewall_mock._calculate_time_distance(datetime.now(), param_schedule_type, None) + check_firewall_mock._calculate_schedule_time_diff(datetime.now(), param_schedule_type, None) assert str(exception_msg.value) == f"Unknown schedule type: {param_schedule_type}." From bab1ccf53bd4ce648c9cd85cf2a99809305ea4fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= <42772730+FoSix@users.noreply.github.com> Date: Thu, 10 Aug 2023 11:33:57 +0200 Subject: [PATCH 08/13] Update panos_upgrade_assurance/firewall_proxy.py Co-authored-by: Alp Eren Kose --- panos_upgrade_assurance/firewall_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/panos_upgrade_assurance/firewall_proxy.py b/panos_upgrade_assurance/firewall_proxy.py index 59bf5d0..88df029 100644 --- a/panos_upgrade_assurance/firewall_proxy.py +++ b/panos_upgrade_assurance/firewall_proxy.py @@ -1084,7 +1084,7 @@ def get_certificates(self) -> dict: def get_update_schedules(self) -> dict: """Get schedules for all dynamic updates. - This method gets all scheduled running on a device. This includes the ones pushed from Panorama. + This method gets scheduled dynamic updates on a device. This includes the ones pushed from Panorama. The actual API command is `devices/entry/deviceconfig/system/update-schedule`. From 93086e6b6054f08d27e5ed284856141e34953c50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Thu, 10 Aug 2023 11:48:49 +0200 Subject: [PATCH 09/13] update documentation --- docs/panos-upgrade-assurance/api/firewall_proxy.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/panos-upgrade-assurance/api/firewall_proxy.md b/docs/panos-upgrade-assurance/api/firewall_proxy.md index 3ab5d5f..d6c9925 100644 --- a/docs/panos-upgrade-assurance/api/firewall_proxy.md +++ b/docs/panos-upgrade-assurance/api/firewall_proxy.md @@ -942,7 +942,7 @@ def get_update_schedules() -> dict Get schedules for all dynamic updates. -This method gets all scheduled running on a device. This includes the ones pushed from Panorama. +This method gets scheduled dynamic updates on a device. This includes the ones pushed from Panorama. The actual API command is `devices/entry/deviceconfig/system/update-schedule`. From 2b7a1d98d55d2e30750c61ec0f0bec47b52b6508 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Fri, 11 Aug 2023 10:32:37 +0200 Subject: [PATCH 10/13] update configuration details document --- .../configuration_details.mdx | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/docs/panos-upgrade-assurance/configuration_details.mdx b/docs/panos-upgrade-assurance/configuration_details.mdx index 2913773..e65414e 100644 --- a/docs/panos-upgrade-assurance/configuration_details.mdx +++ b/docs/panos-upgrade-assurance/configuration_details.mdx @@ -270,6 +270,7 @@ checks_configuration = [ } }, {'content_version': {'version': '8634-7678'}}, + {'dynamic_updates': {'test_window': 120}}, {"expired_licenses": {"skip_licenses": ["Threat Prevention"]}}, {'free_disk_space': {'image_version': '10.1.6-h6'}}, {'ha': {'skip_config_sync': True}}, @@ -309,6 +310,8 @@ checks_configuration: hash_method: "sha1" - content_version: version: "8634-7678" + - dynamic_updates: + test_window: 120 - expired_licenses: skip_licenses: - "Threat Prevention" @@ -522,6 +525,52 @@ checks_configuration: ``` +### `dynamic_updates` + +Check if any Dynamic Update job is scheduled to run within the specified time window. + +**Method:** [`CheckFirewall.check_scheduled_updates()`](/panos/docs/panos-upgrade-assurance/api/check_firewall#checkfirewallcheck_scheduled_updates) + +**Configuration parameters** + +paramter | description +--- | --- +`test_window` | (optional) time window in minutes to look for an update job occurrence + +**Sample configuration** + +```mdx-code-block + + +``` + +```python showLineNumbers +checks_configuration = [ + { + 'dynamic_updates': { + 'test_window': 120 + } + } +] +``` + +```mdx-code-block + + +``` + +```yaml showLineNumbers +checks_configuration: + - dynamic_updates: + test_window: 120 +``` + +```mdx-code-block + + +``` + + ### `free_disk_space` Checks if there is enough free space on the `/opt/panrepo` volume to download a PanOS image before an upgrade. From 1329284f289196a90551267aa62596e5db082370 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Tue, 22 Aug 2023 13:56:24 +0200 Subject: [PATCH 11/13] refactor methods to use config/get XMLAPI instead of OP commands --- .gitignore | 1 - .../api/check_firewall.md | 15 ++- .../panos-upgrade-assurance/api/exceptions.md | 4 + .../api/firewall_proxy.md | 75 +++++++++++++-- .../configuration_details.mdx | 5 + .../run_low_level_methods.py | 48 +++++----- file | 0 panos_upgrade_assurance/check_firewall.py | 68 ++++++++------ panos_upgrade_assurance/exceptions.py | 6 ++ panos_upgrade_assurance/firewall_proxy.py | 93 ++++++++++++++++--- 10 files changed, 236 insertions(+), 79 deletions(-) create mode 100644 file diff --git a/.gitignore b/.gitignore index 7120225..d876353 100644 --- a/.gitignore +++ b/.gitignore @@ -61,5 +61,4 @@ sidebar.json requirements.txt openssl.cnf .coverage -openssl.cnf coverage.xml diff --git a/docs/panos-upgrade-assurance/api/check_firewall.md b/docs/panos-upgrade-assurance/api/check_firewall.md index fcabc9a..d3197b5 100644 --- a/docs/panos-upgrade-assurance/api/check_firewall.md +++ b/docs/panos-upgrade-assurance/api/check_firewall.md @@ -469,18 +469,20 @@ def _calculate_schedule_time_diff(now_dt: datetime, schedule_type: str, A method that calculates the time distance between two `datetime` objects. :::note -This method is used only by [`CheckFirewall.check_scheduled_updates()`](/panos/docs/panos-upgrade-assurance/api/check_firewall#checkfirewallcheck_scheduled_updates) method and it expects some information +This method is used only by [`CheckFirewall.check_scheduled_updates()`](#checkfirewallcheck_scheduled_updates) method and it expects some information to be already available. ::: __Parameters__ -- __now_dt__ (`datetime`): A `datetime` object representing the current moment in time. +- __now_dt__ (`datetime`): A `datetime` object representing the current moment in time. Ideally this should be the device's local + time, taken from the management plane clock. - __schedule_type__ (`str`): A schedule type returned by PanOS, can be one of: `every-*`, `hourly`, `daily`, `weekly`, `real-time`. -- __schedule__ (`dict`): Value of the `recurring` key in the API response, see [`FirewallProxy.get_update_schedules()`](/panos/docs/panos-upgrade-assurance/api/firewall_proxy#firewallproxyget_update_schedules) - documentation for details. +- __schedule__ (`dict`): Value of the `recurring` key in the API response, see + [`FirewallProxy.get_update_schedules()`](/panos/docs/panos-upgrade-assurance/api/firewall_proxy#firewallproxyget_update_schedules) + documentation for details. Both formats (locally configured and pushed from a Panorama template) are supported. __Raises__ @@ -500,6 +502,9 @@ def check_scheduled_updates(test_window: int = 60) -> CheckResult Check if any Dynamic Update job is scheduled to run within the specified time window. +When device is configured via Panorama, this includes schedules set up in Templates. It does not however include schedules +configured in `Panorama/Device Deployment/Dynamic Updates/Schedules`. + __Parameters__ @@ -518,7 +523,7 @@ __Returns__ `CheckResult`: Object of [`CheckResult`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkresult) class taking value of: * [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when there is no update job - planned within the test time window. + planned within the test window. * [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` field contains information about the planned jobs with next occurrence time provided if possible. * [`CheckStatus.ERROR`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when the `test_window` parameter diff --git a/docs/panos-upgrade-assurance/api/exceptions.md b/docs/panos-upgrade-assurance/api/exceptions.md index 9e25674..20b80ba 100644 --- a/docs/panos-upgrade-assurance/api/exceptions.md +++ b/docs/panos-upgrade-assurance/api/exceptions.md @@ -32,6 +32,10 @@ Parent class for all exceptions coming from [Utils](/panos/docs/panos-upgrade-as Used when a command run on a device does not return the `success` status. +## class `GetXpathConfigFailedException` + +Used when XAPI does not return a `success` state when running a `get` operation. + ## class `MalformedResponseException` A generic exception class used when a response does not meet the expected standards. diff --git a/docs/panos-upgrade-assurance/api/firewall_proxy.md b/docs/panos-upgrade-assurance/api/firewall_proxy.md index 6204a8e..bcf54a4 100644 --- a/docs/panos-upgrade-assurance/api/firewall_proxy.md +++ b/docs/panos-upgrade-assurance/api/firewall_proxy.md @@ -35,8 +35,7 @@ Execute a command on node, parse, and return response. This is just a wrapper around the [`Firewall.op()`](https://pan-os-python.readthedocs.io/en/latest/module-firewall.html#panos.firewall.Firewall.op) method. -It additionally does basic error handling and tries to extract the actual device -response. +It additionally does basic error handling and tries to extract the actual device response. __Parameters__ @@ -57,6 +56,36 @@ __Returns__ `dict, xml.etree.ElementTree.Element`: The actual command output. A type is defined by the `return_xml` parameter. +### `FirewallProxy.get_parser` + +```python +def get_parser(xml_path: str, + return_xml: Optional[bool] = False) -> Union[dict, ET.Element] +``` + +Execute a configuration get command on a node, parse and return response. + +This is a wrapper around the +[`pan.xapi.get()` method](https://github.com/kevinsteves/pan-python/blob/master/doc/pan.xapi.rst#getxpathnone) +from the [`pan-python` package](https://pypi.org/project/pan-python/). +It does a basic error handling and tries to extract the actual response. + +__Parameters__ + + +- __xml_path__ (`str`): An XPATH pointing to the config to be retrieved. +- __return_xml__ (`bool`): (defaults to `False`) When set to `True`, the return data is an [`XML object`](https://docs.python.org/3/library/xml.etree.elementtree.html#xml.etree.ElementTree.Element) + instead of a Python dictionary. + +__Raises__ + + +- `GetXpathConfigFailedException`: This exception is raised when XPATH is not provided or does not exist. + +__Returns__ + +`dict, xml.etree.ElementTree.Element`: The actual command output. A type is defined by the `return_xml` parameter. + ### `FirewallProxy.is_pending_changes` ```python @@ -942,20 +971,48 @@ def get_update_schedules() -> dict Get schedules for all dynamic updates. -This method gets scheduled dynamic updates on a device. This includes the ones pushed from Panorama. +This method gets scheduled dynamic updates on a device. This includes the ones pushed from Panorama, +but it does not include the ones configured via `Panorama/Device Deployment/Dynamic Updates/Schedules`. -The actual API command is `devices/entry/deviceconfig/system/update-schedule`. +The actual XMLAPI command run here is `config/get` with XPATH set to +`/config/devices/entry[@name='localhost.localdomain']/deviceconfig/system/update-schedule`. __Returns__ `dict`: All dynamic updates schedules, key is the entity type to update, like: threats, wildfire, etc. -```python showLineNumbers title="Sample output" -{'threats': {'recurring': {'weekly': {'action': 'download-only', - 'at': '01:02', - 'day-of-week': 'wednesday'}}}, -'wildfire': {'recurring': {'real-time': None}}} +```python showLineNumbers title="Sample output, showing values coming from Panorama" +{'@ptpl': 'lab', +'@src': 'tpl', +'anti-virus': {'@ptpl': 'lab', + '@src': 'tpl', + 'recurring': {'@ptpl': 'lab', + '@src': 'tpl', + 'hourly': {'@ptpl': 'lab', + '@src': 'tpl', + 'action': {'`text`': 'download-and-install', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'at': {'`text`': '0', + '@ptpl': 'lab', + '@src': 'tpl'}}}}, +'global-protect-clientless-vpn': {'@ptpl': 'lab', + '@src': 'tpl', + 'recurring': {'@ptpl': 'lab', + '@src': 'tpl', + 'weekly': {'@ptpl': 'lab', + '@src': 'tpl', + 'action': {'`text`': 'download-only', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'at': {'`text`': '20:00', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'day-of-week': {'`text`': 'wednesday', + '@ptpl': 'lab', + '@src': 'tpl'}}}} +} ``` ### `FirewallProxy.get_jobs` diff --git a/docs/panos-upgrade-assurance/configuration_details.mdx b/docs/panos-upgrade-assurance/configuration_details.mdx index e65414e..0408b92 100644 --- a/docs/panos-upgrade-assurance/configuration_details.mdx +++ b/docs/panos-upgrade-assurance/configuration_details.mdx @@ -529,6 +529,11 @@ checks_configuration: Check if any Dynamic Update job is scheduled to run within the specified time window. +:::note +This includes schedules pushed from Panorama via a template, but does not include the ones configured +in `Panorama/Device Deployment/Dynamic Updates/Schedules`. +::: + **Method:** [`CheckFirewall.check_scheduled_updates()`](/panos/docs/panos-upgrade-assurance/api/check_firewall#checkfirewallcheck_scheduled_updates) **Configuration parameters** diff --git a/examples/low_level_methods/run_low_level_methods.py b/examples/low_level_methods/run_low_level_methods.py index 575c0b0..b44f0e2 100755 --- a/examples/low_level_methods/run_low_level_methods.py +++ b/examples/low_level_methods/run_low_level_methods.py @@ -75,50 +75,50 @@ hostname=address, api_password=password, api_username=username, vsys=vsys ) - # p_config = firewall.is_panorama_configured() - # print(f"\n panorama configured: {p_config}") + p_config = firewall.is_panorama_configured() + print(f"\n panorama configured: {p_config}") - # if p_config: - # print(f"\n panorama connected: {firewall.is_panorama_connected()}") + if p_config: + print(f"\n panorama connected: {firewall.is_panorama_connected()}") - # print(f"\n pending changed: {firewall.is_pending_changes()}") - # print(f"\n full commit pending: {firewall.is_full_commit_required()}") + print(f"\n pending changed: {firewall.is_pending_changes()}") + print(f"\n full commit pending: {firewall.is_full_commit_required()}") - # print(f"\n ha configuration\n{firewall.get_ha_configuration()}") + print(f"\n ha configuration\n{firewall.get_ha_configuration()}") - # print(f"\n nic statuses\n{firewall.get_nics()}") + print(f"\n nic statuses\n{firewall.get_nics()}") - # print(f"\n licenses information\n{firewall.get_licenses()}") + print(f"\n licenses information\n{firewall.get_licenses()}") - # print(f"\n support license information\n{firewall.get_support_license()}") + print(f"\n support license information\n{firewall.get_support_license()}") - # print(f"\n routes information\n{firewall.get_routes()}") + print(f"\n routes information\n{firewall.get_routes()}") - # print(f"\n arp entries information\n{firewall.get_arp_table()}") + print(f"\n arp entries information\n{firewall.get_arp_table()}") - # print(f"\n session information\n{firewall.get_session_stats()}") + print(f"\n session information\n{firewall.get_session_stats()}") - # print(f"\n session information\n{firewall.get_sessions()}") + print(f"\n session information\n{firewall.get_sessions()}") - # print(f"\n tunnels information\n{firewall.get_tunnels()}") + print(f"\n tunnels information\n{firewall.get_tunnels()}") - # print(f"\n NTP SRVs information\n{firewall.get_ntp_servers()}") + print(f"\n NTP SRVs information\n{firewall.get_ntp_servers()}") - # print(f"\n content DB version: {firewall.get_content_db_version()}") + print(f"\n content DB version: {firewall.get_content_db_version()}") - # print(f"\n latest availble content DB version: {firewall.get_latest_available_content_version()}") + print(f"\n latest availble content DB version: {firewall.get_latest_available_content_version()}") - # print(f"\n disk utilization: {firewall.get_disk_utilization()}") + print(f"\n disk utilization: {firewall.get_disk_utilization()}") - # print(f"\n available image versions: {firewall.get_available_image_data()}") + print(f"\n available image versions: {firewall.get_available_image_data()}") - # print(f"\n management plane clock: {firewall.get_mp_clock()}") + print(f"\n management plane clock: {firewall.get_mp_clock()}") - # print(f"\n data plane clock: {firewall.get_dp_clock()}") + print(f"\n data plane clock: {firewall.get_dp_clock()}") - # print(f"\n certificates: {firewall.get_certificates()}") + print(f"\n certificates: {firewall.get_certificates()}") - # print(f"\n dynamic schedules: {firewall.get_update_schedules()}") + print(f"\n dynamic schedules: {firewall.get_update_schedules()}") pprint(firewall.get_update_schedules()) print(f"\n jobs: {firewall.get_jobs()}") diff --git a/file b/file new file mode 100644 index 0000000..e69de29 diff --git a/panos_upgrade_assurance/check_firewall.py b/panos_upgrade_assurance/check_firewall.py index 749f89f..027b896 100644 --- a/panos_upgrade_assurance/check_firewall.py +++ b/panos_upgrade_assurance/check_firewall.py @@ -868,17 +868,19 @@ def _calculate_schedule_time_diff(self, now_dt: datetime, schedule_type: str, sc """A method that calculates the time distance between two `datetime` objects. :::note - This method is used only by [`CheckFirewall.check_scheduled_updates()`](/panos/docs/panos-upgrade-assurance/api/check_firewall#checkfirewallcheck_scheduled_updates) method and it expects some information + This method is used only by [`CheckFirewall.check_scheduled_updates()`](#checkfirewallcheck_scheduled_updates) method and it expects some information to be already available. ::: # Parameters - now_dt (datetime): A `datetime` object representing the current moment in time. + now_dt (datetime): A `datetime` object representing the current moment in time. Ideally this should be the device's local + time, taken from the management plane clock. schedule_type (str): A schedule type returned by PanOS, can be one of: `every-*`, `hourly`, `daily`, `weekly`, `real-time`. - schedule (dict): Value of the `recurring` key in the API response, see [`FirewallProxy.get_update_schedules()`](/panos/docs/panos-upgrade-assurance/api/firewall_proxy#firewallproxyget_update_schedules) - documentation for details. + schedule (dict): Value of the `recurring` key in the API response, see + [`FirewallProxy.get_update_schedules()`](/panos/docs/panos-upgrade-assurance/api/firewall_proxy#firewallproxyget_update_schedules) + documentation for details. Both formats (locally configured and pushed from a Panorama template) are supported. # Raises @@ -893,7 +895,7 @@ def _calculate_schedule_time_diff(self, now_dt: datetime, schedule_type: str, sc details = "unsupported schedule type" if schedule_type == "daily": - occurrence = schedule["at"] + occurrence = schedule["at"] if isinstance(schedule["at"], str) else schedule["at"]["#text"] next_occurrence = datetime.strptime(f"{str(now_dt.date())} {occurrence}", "%Y-%m-%d %H:%M") if now_dt > next_occurrence: @@ -906,8 +908,10 @@ def _calculate_schedule_time_diff(self, now_dt: datetime, schedule_type: str, sc time_distance = 60 details = "every hour" elif schedule_type == "weekly": - occurrence_time = schedule["at"] - occurrence_day = schedule["day-of-week"] + occurrence_time = schedule["at"] if isinstance(schedule["at"], str) else schedule["at"]["#text"] + occurrence_day = ( + schedule["day-of-week"] if isinstance(schedule["day-of-week"], str) else schedule["day-of-week"]["#text"] + ) occurrence_wday = time.strptime(occurrence_day, "%A").tm_wday now_wday = now_dt.weekday() @@ -943,6 +947,9 @@ def _calculate_schedule_time_diff(self, now_dt: datetime, schedule_type: str, sc def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: """Check if any Dynamic Update job is scheduled to run within the specified time window. + When device is configured via Panorama, this includes schedules set up in Templates. It does not however include schedules + configured in `Panorama/Device Deployment/Dynamic Updates/Schedules`. + # Parameters test_window (int, optional): (defaults to 60 minutes). A time window in minutes to look for an update job occurrence. @@ -959,7 +966,7 @@ def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: value of: * [`CheckStatus.SUCCESS`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when there is no update job - planned within the test time window. + planned within the test window. * [`CheckStatus.FAIL`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) otherwise, `CheckResult.reason` field contains information about the planned jobs with next occurrence time provided if possible. * [`CheckStatus.ERROR`](/panos/docs/panos-upgrade-assurance/api/utils#class-checkstatus) when the `test_window` parameter @@ -992,24 +999,33 @@ def check_scheduled_updates(self, test_window: int = 60) -> CheckResult: schedules_in_window = [] for name, schedule in schedules.items(): - if "recurring" not in schedule.keys(): - raise exceptions.MalformedResponseException(f"Schedule {name} has malformed configuration, missing a schedule..") - if len(schedule) != 1: - raise exceptions.MalformedResponseException(f"Schedule {name} has malformed configuration: {schedule}") - - schedule_details = schedule["recurring"] - - if "sync-to-peer" in schedule_details: - schedule_details.pop("sync-to-peer") - - if "none" not in schedule_details: - time_distance, details = self._calculate_schedule_time_diff( - now_dt=mp_now, - schedule_type=next(iter(schedule_details.keys())), - schedule=next(iter(schedule_details.values())), - ) - if time_distance <= test_window: - schedules_in_window.append(f"{name} ({details})") + # config can come from a Template, it will have some additional keys starting with '@' + # that we would like to skip + if "@" not in name: + if "recurring" not in schedule.keys(): + raise exceptions.MalformedResponseException( + f"Schedule {name} has malformed configuration, missing a schedule.." + ) + + schedule_details = schedule["recurring"] + + # let's get rid of all keys that are not related to a schedule + for k in list(schedule_details.keys()): + if k in ["sync-to-peer", "threshold"] or k.startswith("@"): + schedule_details.pop(k) + + # we now should have a single element dict + if len(schedule_details) != 1: + raise exceptions.MalformedResponseException(f"Schedule {name} has malformed configuration: {schedule}") + + if "none" not in schedule_details: + time_distance, details = self._calculate_schedule_time_diff( + now_dt=mp_now, + schedule_type=next(iter(schedule_details.keys())), + schedule=next(iter(schedule_details.values())), + ) + if time_distance <= test_window: + schedules_in_window.append(f"{name} ({details})") if schedules_in_window: result.reason = f"Following schedules fall into test window: {', '.join(schedules_in_window)}." diff --git a/panos_upgrade_assurance/exceptions.py b/panos_upgrade_assurance/exceptions.py index c7f02cd..29b7741 100644 --- a/panos_upgrade_assurance/exceptions.py +++ b/panos_upgrade_assurance/exceptions.py @@ -37,6 +37,12 @@ class CommandRunFailedException(FirewallProxyException): pass +class GetXpathConfigFailedException(FirewallProxyException): + """Used when XAPI does not return a `success` state when running a `get` operation.""" + + pass + + class MalformedResponseException(FirewallProxyException): """A generic exception class used when a response does not meet the expected standards.""" diff --git a/panos_upgrade_assurance/firewall_proxy.py b/panos_upgrade_assurance/firewall_proxy.py index ff2dc9f..faee60c 100644 --- a/panos_upgrade_assurance/firewall_proxy.py +++ b/panos_upgrade_assurance/firewall_proxy.py @@ -37,8 +37,7 @@ def op_parser( This is just a wrapper around the [`Firewall.op()`](https://pan-os-python.readthedocs.io/en/latest/module-firewall.html#panos.firewall.Firewall.op) method. - It additionally does basic error handling and tries to extract the actual device - response. + It additionally does basic error handling and tries to extract the actual device response. # Parameters @@ -72,6 +71,47 @@ def op_parser( return resp_result + def get_parser(self, xml_path: str, return_xml: Optional[bool] = False) -> Union[dict, ET.Element]: + """Execute a configuration get command on a node, parse and return response. + + This is a wrapper around the + [`pan.xapi.get()` method](https://github.com/kevinsteves/pan-python/blob/master/doc/pan.xapi.rst#getxpathnone) + from the [`pan-python` package](https://pypi.org/project/pan-python/). + It does a basic error handling and tries to extract the actual response. + + # Parameters + + xml_path (str): An XPATH pointing to the config to be retrieved. + return_xml (bool): (defaults to `False`) When set to `True`, the return data is an \ + [`XML object`](https://docs.python.org/3/library/xml.etree.elementtree.html#xml.etree.ElementTree.Element) + instead of a Python dictionary. + + # Raises + + GetXpathConfigFailedException: This exception is raised when XPATH is not provided or does not exist. + + # Returns + dict, xml.etree.ElementTree.Element: The actual command output. A type is defined by the `return_xml` parameter. + + """ + if xml_path is None: + raise exceptions.GetXpathConfigFailedException("No XPATH provided.") + + raw_response = self.xapi.get(xml_path) + if raw_response.get("status") != "success": + raise exceptions.GetXpathConfigFailedException( + f'Failed get data under XPATH: {xml_path}, status: {raw_response.get("status")}.' + ) + + resp_result = raw_response.find("result") + if resp_result is None: + raise exceptions.GetXpathConfigFailedException(f"No data found under XPATH: {xml_path}, or path does not exist.") + + if not return_xml: + resp_result = XMLParse(ET.tostring(resp_result, encoding="utf8", method="xml"))["result"] + + return resp_result + def is_pending_changes(self) -> bool: """Get information if there is a candidate configuration pending to be committed. @@ -1084,28 +1124,53 @@ def get_certificates(self) -> dict: def get_update_schedules(self) -> dict: """Get schedules for all dynamic updates. - This method gets scheduled dynamic updates on a device. This includes the ones pushed from Panorama. + This method gets scheduled dynamic updates on a device. This includes the ones pushed from Panorama, + but it does not include the ones configured via `Panorama/Device Deployment/Dynamic Updates/Schedules`. - The actual API command is `devices/entry/deviceconfig/system/update-schedule`. + The actual XMLAPI command run here is `config/get` with XPATH set to + `/config/devices/entry[@name='localhost.localdomain']/deviceconfig/system/update-schedule`. # Returns dict: All dynamic updates schedules, key is the entity type to update, like: threats, wildfire, etc. - ```python showLineNumbers title="Sample output" - {'threats': {'recurring': {'weekly': {'action': 'download-only', - 'at': '01:02', - 'day-of-week': 'wednesday'}}}, - 'wildfire': {'recurring': {'real-time': None}}} + ```python showLineNumbers title="Sample output, showing values coming from Panorama" + {'@ptpl': 'lab', + '@src': 'tpl', + 'anti-virus': {'@ptpl': 'lab', + '@src': 'tpl', + 'recurring': {'@ptpl': 'lab', + '@src': 'tpl', + 'hourly': {'@ptpl': 'lab', + '@src': 'tpl', + 'action': {'#text': 'download-and-install', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'at': {'#text': '0', + '@ptpl': 'lab', + '@src': 'tpl'}}}}, + 'global-protect-clientless-vpn': {'@ptpl': 'lab', + '@src': 'tpl', + 'recurring': {'@ptpl': 'lab', + '@src': 'tpl', + 'weekly': {'@ptpl': 'lab', + '@src': 'tpl', + 'action': {'#text': 'download-only', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'at': {'#text': '20:00', + '@ptpl': 'lab', + '@src': 'tpl'}, + 'day-of-week': {'#text': 'wednesday', + '@ptpl': 'lab', + '@src': 'tpl'}}}} + } ``` """ - schedules = self.op_parser( - cmd="devices/entry/deviceconfig/system/update-schedule", - cmd_in_xml=True, + schedules = self.get_parser( + xml_path="/config/devices/entry[@name='localhost.localdomain']/deviceconfig/system/update-schedule" ) - if schedules is None: - return {} if "update-schedule" not in schedules: return {} From 49aac4ca844e22651c2e0a4980b1d1f64fb7a6fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Tue, 22 Aug 2023 16:34:49 +0200 Subject: [PATCH 12/13] add and update tests --- file | 0 panos_upgrade_assurance/firewall_proxy.py | 2 +- tests/test_check_firewall.py | 19 ++- tests/test_firewall_proxy.py | 184 +++++++++++++++++++--- 4 files changed, 185 insertions(+), 20 deletions(-) delete mode 100644 file diff --git a/file b/file deleted file mode 100644 index e69de29..0000000 diff --git a/panos_upgrade_assurance/firewall_proxy.py b/panos_upgrade_assurance/firewall_proxy.py index faee60c..1859f9a 100644 --- a/panos_upgrade_assurance/firewall_proxy.py +++ b/panos_upgrade_assurance/firewall_proxy.py @@ -1171,7 +1171,7 @@ def get_update_schedules(self) -> dict: schedules = self.get_parser( xml_path="/config/devices/entry[@name='localhost.localdomain']/deviceconfig/system/update-schedule" ) - if "update-schedule" not in schedules: + if schedules is None or "update-schedule" not in schedules: return {} return schedules["update-schedule"] diff --git a/tests/test_check_firewall.py b/tests/test_check_firewall.py index 7703e1f..6150b35 100644 --- a/tests/test_check_firewall.py +++ b/tests/test_check_firewall.py @@ -947,7 +947,24 @@ def test__calculate_schedule_time_diff_exception(self, param_schedule_type, chec ( "2023-08-07 00:00:00", # this is Monday 120, - {"anti-virus": {"recurring": {"daily": {"action": "download-and-install", "at": "07:45"}}}}, + { + "anti-virus": { + "@ptpl": "lab", # a template provided config + "@src": "tpl", + "recurring": { + "@ptpl": "lab", + "@src": "tpl", + "daily": { + "@ptpl": "lab", + "@src": "tpl", + "action": {"#text": "download-and-install", "@ptpl": "lab", "@src": "tpl"}, + "at": {"#text": "03:30", "@ptpl": "lab", "@src": "tpl"}, + }, + "sync-to-peer": {"#text": "yes", "@ptpl": "lab", "@src": "tpl"}, + "threshold": {"#text": "15", "@ptpl": "lab", "@src": "tpl"}, + }, + } + }, CheckResult(CheckStatus.SUCCESS, ""), ), ( diff --git a/tests/test_firewall_proxy.py b/tests/test_firewall_proxy.py index a9a6b66..402852f 100644 --- a/tests/test_firewall_proxy.py +++ b/tests/test_firewall_proxy.py @@ -12,6 +12,7 @@ WrongDiskSizeFormatException, DeviceNotLicensedException, UpdateServerConnectivityException, + GetXpathConfigFailedException, ) from datetime import datetime @@ -20,6 +21,8 @@ def fw_proxy_mock(): fw_proxy_obj = FirewallProxy() fw_proxy_obj.op = MagicMock() + fw_proxy_obj.generate_xapi = MagicMock() + fw_proxy_obj.xapi.get = MagicMock() yield fw_proxy_obj @@ -75,6 +78,73 @@ def test_op_parser_none(self, fw_proxy_mock): fw_proxy_mock.op.assert_called_with(cmd, xml=False, cmd_xml=True, vsys=fw_proxy_mock.vsys) + def test_get_parser_correct_response_defaults(self, fw_proxy_mock): + input_xpath = "/some/xpath" + xml_output_text = """ + + + value + + + """ + xml_output = ET.fromstring(xml_output_text) + fw_proxy_mock.xapi.get.return_value = xml_output + + response = fw_proxy_mock.get_parser(input_xpath) + mocked_response = xml_parse(ET.tostring(xml_output.find("result"), encoding="utf8", method="xml"))["result"] + + assert response == mocked_response + + def test_get_parser_correct_response_in_xml(self, fw_proxy_mock): + input_xpath = "/some/xpath" + xml_output_text = """ + + + value + + + """ + xml_output = ET.fromstring(xml_output_text) + fw_proxy_mock.xapi.get.return_value = xml_output + + response = fw_proxy_mock.get_parser(input_xpath, True) + mocked_response = xml_output.find("result") + + assert response == mocked_response + + def test_get_parser_no_xpath_exception(self, fw_proxy_mock): + with pytest.raises(GetXpathConfigFailedException) as exc_info: + fw_proxy_mock.get_parser(None) + assert "No XPATH provided." in str(exc_info.value) + + def test_get_parser_incorrect_response(self, fw_proxy_mock): + input_xpath = "/some/xpath" + xml_output_text = """ + + + + """ + xml_output = ET.fromstring(xml_output_text) + fw_proxy_mock.xapi.get.return_value = xml_output + + with pytest.raises(GetXpathConfigFailedException) as exc_info: + fw_proxy_mock.get_parser(input_xpath) + + expected = f'Failed get data under XPATH: {input_xpath}, status: {xml_output.get("status")}.' + assert expected == str(exc_info.value) + + def test_get_parser_no_response(self, fw_proxy_mock): + input_xpath = "/some/xpath" + xml_output_text = '' + xml_output = ET.fromstring(xml_output_text) + fw_proxy_mock.xapi.get.return_value = xml_output + + with pytest.raises(GetXpathConfigFailedException) as exc_info: + fw_proxy_mock.get_parser(input_xpath) + + expected = f"No data found under XPATH: {input_xpath}, or path does not exist." + assert expected == str(exc_info.value) + def test_is_pending_changes_true(self, fw_proxy_mock): xml_text = "yes" raw_response = ET.fromstring(xml_text) @@ -1236,36 +1306,114 @@ def test_get_certificates_no_certificate(self, fw_proxy_mock): def test_get_update_schedules(self, fw_proxy_mock): xml_text = """ - - - - - - yes - - 15:30 - download-and-install + + + + + + 15 + + 00:30 + download-and-install + yes - - + + + + + 4 + download-only + yes + + + + + + + + + + + + + + - 07:45 + 01:45 download-and-install - + + + + + wednesday + 01:02 + download-only + + + """ raw_response = ET.fromstring(xml_text) - fw_proxy_mock.op.return_value = raw_response + fw_proxy_mock.xapi.get.return_value = raw_response + # fw_proxy_mock.op.return_value = raw_response + # fw_proxy_mock.get_parser.return_value = raw_response response = { - "anti-virus": {"recurring": {"daily": {"action": "download-and-install", "at": "07:45"}}}, - "threats": {"recurring": {"daily": {"action": "download-and-install", "at": "15:30"}, "sync-to-peer": "yes"}}, + "@ptpl": "lab", + "@src": "tpl", + "anti-virus": { + "@ptpl": "lab", + "@src": "tpl", + "recurring": { + "@ptpl": "lab", + "@src": "tpl", + "daily": { + "@ptpl": "lab", + "@src": "tpl", + "action": {"#text": "download-and-install", "@ptpl": "lab", "@src": "tpl"}, + "at": {"#text": "00:30", "@ptpl": "lab", "@src": "tpl"}, + }, + "sync-to-peer": {"#text": "yes", "@ptpl": "lab", "@src": "tpl"}, + "threshold": {"#text": "15", "@ptpl": "lab", "@src": "tpl"}, + }, + }, + "global-protect-clientless-vpn": { + "@ptpl": "lab", + "@src": "tpl", + "recurring": {"daily": {"action": "download-and-install", "at": "01:45"}}, + }, + "global-protect-datafile": { + "@ptpl": "lab", + "@src": "tpl", + "recurring": {"@ptpl": "lab", "@src": "tpl", "none": {"@ptpl": "lab", "@src": "tpl"}}, + }, + "threats": {"recurring": {"weekly": {"action": "download-only", "at": "01:02", "day-of-week": "wednesday"}}}, + "wf-private": { + "@ptpl": "lab", + "@src": "tpl", + "recurring": {"@ptpl": "lab", "@src": "tpl", "none": {"@ptpl": "lab", "@src": "tpl"}}, + }, + "wildfire": { + "@ptpl": "lab", + "@src": "tpl", + "recurring": { + "@ptpl": "lab", + "@src": "tpl", + "every-15-mins": { + "@ptpl": "lab", + "@src": "tpl", + "action": {"#text": "download-only", "@ptpl": "lab", "@src": "tpl"}, + "at": {"#text": "4", "@ptpl": "lab", "@src": "tpl"}, + "sync-to-peer": {"#text": "yes", "@ptpl": "lab", "@src": "tpl"}, + }, + }, + }, } assert fw_proxy_mock.get_update_schedules() == response @@ -1278,7 +1426,7 @@ def test_get_update_schedules_empty_response(self, fw_proxy_mock): """ raw_response = ET.fromstring(xml_text) - fw_proxy_mock.op.return_value = raw_response + fw_proxy_mock.xapi.get.return_value = raw_response assert fw_proxy_mock.get_update_schedules() == {} @@ -1292,6 +1440,6 @@ def test_get_update_schedules_no_update_schedules_key(self, fw_proxy_mock): """ raw_response = ET.fromstring(xml_text) - fw_proxy_mock.op.return_value = raw_response + fw_proxy_mock.xapi.get.return_value = raw_response assert fw_proxy_mock.get_update_schedules() == {} From a29352c72ac9a0990912308777b0e06197171f2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Pawl=C4=99ga?= Date: Tue, 29 Aug 2023 15:15:48 +0200 Subject: [PATCH 13/13] pushing changes from comments --- examples/low_level_methods/run_low_level_methods.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/low_level_methods/run_low_level_methods.py b/examples/low_level_methods/run_low_level_methods.py index b44f0e2..84e95fc 100755 --- a/examples/low_level_methods/run_low_level_methods.py +++ b/examples/low_level_methods/run_low_level_methods.py @@ -119,7 +119,6 @@ print(f"\n certificates: {firewall.get_certificates()}") print(f"\n dynamic schedules: {firewall.get_update_schedules()}") - pprint(firewall.get_update_schedules()) print(f"\n jobs: {firewall.get_jobs()}")