Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 39 additions & 30 deletions fortigate_api/fortigate_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

from __future__ import annotations

import ipaddress
import json
import logging
import re
from typing import Callable, Iterable, Optional
from urllib.parse import urlencode, urljoin
from urllib.parse import urlencode, urljoin, urlunparse

import requests
from requests import Session, Response
Expand All @@ -20,6 +21,7 @@
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

HTTPS = "https"
HTTP = "http"
PORT_443 = 443
PORT_80 = 80
TIMEOUT = 15
Expand Down Expand Up @@ -59,7 +61,7 @@ def __init__(self, **kwargs):
:param bool logging_error: Logging only the REST API response with error.
`True` - Enable errors logging, `False` - otherwise. Default is `False`.
"""
self.host = str(kwargs.get("host"))
self.host = _init_host(**kwargs)
self.username = str(kwargs.get("username"))
self.password = str(kwargs.get("password"))
self.token = _init_token(**kwargs)
Expand All @@ -81,7 +83,7 @@ def __repr__(self):
host = self.host
username = self.username
scheme = self.scheme
port = self.port if not (scheme == HTTPS and self.port == PORT_443) else ""
port = self.port
timeout = self.timeout
verify = self.verify
vdom = self.vdom
Expand Down Expand Up @@ -123,11 +125,7 @@ def is_connected(self) -> bool:
@property
def url(self) -> str:
"""Return URL to the Fortigate."""
if self.scheme == HTTPS and self.port == 443:
return f"{self.scheme}://{self.host}"
if self.scheme == "http" and self.port == 80:
return f"{self.scheme}://{self.host}"
return f"{self.scheme}://{self.host}:{self.port}"
return urlunparse((self.scheme, f"{self.host}:{self.port}", "/", "", "", ""))

# ============================ login =============================

Expand All @@ -145,7 +143,7 @@ def login(self) -> None:
if self.token:
try:
response: Response = session.get(
url=f"{self.url}/api/v2/monitor/system/status",
url=urljoin(self.url, "/api/v2/monitor/system/status"),
headers=self._bearer_token(),
verify=self.verify,
)
Expand All @@ -158,7 +156,7 @@ def login(self) -> None:
# password
try:
response = session.post(
url=f"{self.url}/logincheck",
url=urljoin(self.url, "/logincheck"),
data=urlencode([("username", self.username), ("secretkey", self.password)]),
timeout=self.timeout,
verify=self.verify,
Expand All @@ -182,13 +180,12 @@ def logout(self) -> None:
if not self.token:
try:
self._session.get(
url=f"{self.url}/logout",
url=urljoin(self.url, "/logout"),
timeout=self.timeout,
verify=self.verify,
)
except SSLError:
pass
del self._session
self._session = None

# =========================== helpers ============================
Expand Down Expand Up @@ -251,7 +248,7 @@ def _init_port(self, **kwargs) -> int:
"""Init port, 443 for scheme=`https`, 80 for scheme=`http`."""
if port := int(kwargs.get("port") or 0):
return port
if self.scheme == "http":
if self.scheme == HTTP:
return PORT_80
return PORT_443

Expand Down Expand Up @@ -281,7 +278,7 @@ def _response(self, method: Method, url: str, data: ODAny = None) -> Response:
:rtype: Response
"""
params: DAny = {
"url": self._valid_url(url),
"url": urljoin(self.url, url),
"params": urlencode([("vdom", self.vdom)]),
"timeout": self.timeout,
"verify": self.verify,
Expand All @@ -299,36 +296,48 @@ def _response(self, method: Method, url: str, data: ODAny = None) -> Response:
raise self._hide_secret_ex(ex)
return response

def _valid_url(self, url: str) -> str:
"""Return a valid URL string.

Add `https://` to `url` if it is absent and remove trailing `/` character.
"""
if re.match("http(s)?://", url):
return url.rstrip("/")
path = url.strip("/")
return urljoin(self.url, path)


# =========================== helpers ============================


def _init_scheme(**kwargs) -> str:
"""Init scheme `https` or `http`."""
scheme = str(kwargs.get("scheme") or HTTPS)
expected = ["https", "http"]
expected = [HTTPS, HTTP]
if scheme not in expected:
raise ValueError(f"{scheme=}, {expected=}.")
raise ValueError(f"{scheme=!r}, {expected=!r}.")
return scheme


def _init_host(**kwargs) -> str:
"""Init host valid hostname or valid IP"""
host = str(kwargs.get("host"))
if not host:
raise ValueError(f"{host=!r}, hostname is not specified.")
if len(host) > 255:
raise ValueError(f"{host=!r}, hostname is too long.")
if host.startswith('[') and host.endswith(']'):
host = host[1:-1]
if re.fullmatch(r"\d+\.\d+\.\d+\.\d+", host) or ":" in host:
try:
ip = ipaddress.ip_address(host)
if isinstance(ip, ipaddress.IPv6Address):
return f"[{host}]"
return host
except ValueError:
raise ValueError(f"{host=!r}, not a valid ip address")
if host[-1] == ".":
host = host[:-1]
allowed = re.compile(r"^(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)
if all(allowed.match(label) for label in host.split(".")):
return host
raise ValueError(f"{host=!r}, hostname is not RFC compliant.")

def _init_token(**kwargs) -> str:
"""Init token."""
token = str(kwargs.get("token") or "")
if not token:
return ""
if kwargs.get("username"):
raise ValueError("Mutually excluded: username, token.")
raise ValueError("A username and a token are mutually exclusive.")
if kwargs.get("password"):
raise ValueError("Mutually excluded: password, token.")
raise ValueError("A password and a token are mutually exclusive.")
return token
99 changes: 61 additions & 38 deletions tests/test__fortigate_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,27 @@ def api() -> FortiGate:
@pytest.mark.parametrize("kwargs, expected", [
# username password
(dict(host="a", username="b", password="c"),
"FortiGate(host='a', username='b')"),
"FortiGate(host='a', username='b', port=443)"),
(dict(host="a", username="b", password="c", scheme="https", port=443, timeout=15, vdom="root"),
"FortiGate(host='a', username='b')"),
"FortiGate(host='a', username='b', port=443)"),
(dict(host="a", username="b", password="c", port=80),
"FortiGate(host='a', username='b', port=80)"),
(dict(host="a", username="b", password="c", scheme="https", port=80),
"FortiGate(host='a', username='b', port=80)"),
(dict(host="a", username="b", password="c", scheme="http", port=80),
"FortiGate(host='a', username='b', scheme='http', port=80)"),
(dict(host="a", username="b", password="c", timeout=1),
"FortiGate(host='a', username='b', timeout=1)"),
"FortiGate(host='a', username='b', port=443, timeout=1)"),
(dict(host="a", username="b", password="c", vdom="d"),
"FortiGate(host='a', username='b', vdom='d')"),
"FortiGate(host='a', username='b', port=443, vdom='d')"),
(dict(host="a", username="b", password="c", vdom="d", timeout=1, port=80),
"FortiGate(host='a', username='b', port=80, timeout=1, vdom='d')"),
(dict(host="a", username="b", password="c", verify=True),
"FortiGate(host='a', username='b', verify=True)"),
"FortiGate(host='a', username='b', port=443, verify=True)"),
(dict(host="a", username="b", password="c", verify=False),
"FortiGate(host='a', username='b')"),
"FortiGate(host='a', username='b', port=443)"),
# token
(dict(host="a", token="b"), "FortiGate(host='a', username='')"),
(dict(host="a", token="b"), "FortiGate(host='a', username='', port=443)"),
])
def test__repr__(kwargs, expected):
"""FortiGateBase.__repr__()"""
Expand Down Expand Up @@ -82,14 +82,14 @@ def test__is_connected(api: FortiGate, session, expected):


@pytest.mark.parametrize("scheme, host, port, expected", [
("https", "host", 80, "https://host:80"),
("https", "127.0.0.255", 80, "https://127.0.0.255:80"),
("https", "host", 443, "https://host"),
("https", "127.0.0.255", 443, "https://127.0.0.255"),
("http", "host", 80, "http://host"),
("http", "127.0.0.255", 80, "http://127.0.0.255"),
("http", "host", 443, "http://host:443"),
("http", "127.0.0.255", 443, "http://127.0.0.255:443"),
("https", "host", 80, "https://host:80/"),
("https", "127.0.0.255", 80, "https://127.0.0.255:80/"),
("https", "host", 443, "https://host:443/"),
("https", "127.0.0.255", 443, "https://127.0.0.255:443/"),
("http", "host", 80, "http://host:80/"),
("http", "127.0.0.255", 80, "http://127.0.0.255:80/"),
("http", "host", 443, "http://host:443/"),
("http", "127.0.0.255", 443, "http://127.0.0.255:443/"),
])
def test__url(scheme, host, port, expected):
"""FortiGateBase.url()"""
Expand All @@ -110,9 +110,9 @@ def test__login(token, expected, headers):
api = FortiGate(host="HOST", token=token)
with requests_mock.Mocker() as mock:
if token:
mock.get("https://host/api/v2/monitor/system/status")
mock.get("https://host:443/api/v2/monitor/system/status")
else:
mock.post("https://host/logincheck")
mock.post("https://host:443/logincheck")

with patch("fortigate_api.FortiGate._get_token_from_cookies", return_value=expected):
api.login()
Expand Down Expand Up @@ -199,27 +199,6 @@ def test__init_port(api: FortiGate, kwargs, scheme, expected):
with pytest.raises(expected):
api._init_port(**kwargs)


@pytest.mark.parametrize("kwargs, url, expected", [
({}, QUERY, f"https://host/{QUERY}"),
({}, f"/{QUERY}/", f"https://host/{QUERY}"),
({}, f"https://host/{QUERY}", f"https://host/{QUERY}"),
({}, f"https://host/{QUERY}/", f"https://host/{QUERY}"),
({"port": 80}, QUERY, f"https://host:80/{QUERY}"),
({"port": 80}, f"https://host:80/{QUERY}", f"https://host:80/{QUERY}"),
({"scheme": "http", "port": 80}, QUERY, f"http://host/{QUERY}"),
({"scheme": "http", "port": 80}, f"http://host/{QUERY}", f"http://host/{QUERY}"),
])
def test__valid_url(kwargs, url, expected):
"""FortiGateBase._valid_url()"""
default = dict(host="host", username="username", password="", port=443)
kwargs_ = {**default, **kwargs}
fgt = FortiGate(**kwargs_)

actual = fgt._valid_url(url=url)
assert actual == expected


# =========================== helpers ============================

@pytest.mark.parametrize("kwargs, expected", [
Expand All @@ -240,6 +219,50 @@ def test__init_scheme(kwargs, expected):
fortigate_base._init_scheme(**kwargs)


@pytest.mark.parametrize("kwargs, expected", [
(dict(host=""), ValueError),
(dict(host=" "), ValueError),
(dict(host=":"), ValueError),
(dict(host=":80"), ValueError),
(dict(host="//"), ValueError),
(dict(host="/path"), ValueError),
(dict(host="///host"), ValueError),
(dict(host="https://example.com"), ValueError),
(dict(host="host..com"), ValueError),
(dict(host="hostname-.com"), ValueError),
(dict(host="ho$stname.com"), ValueError),
(dict(host="host name.com"), ValueError),
(dict(host="hostname." + "a" * 256), ValueError),
(dict(host="256.256.256.256"), ValueError),
(dict(host="192.168.1.999"), ValueError),
(dict(host="[2001:db8:::1]"), ValueError),
(dict(host="[2001:db8::1"), ValueError),
(dict(host="2001:db8::1]"), ValueError),
(dict(host="host<>name.com"), ValueError),
(dict(host="host|name.com"), ValueError),
(dict(host="host^name.com"), ValueError),
(dict(host="host/name.com"), ValueError),
(dict(host="host\\name.com"), ValueError),
(dict(host="localhost:"), ValueError),
(dict(host="localhost:abc"), ValueError),
(dict(host="localhost:99999"), ValueError),
(dict(host="[::1]:999999"), ValueError),
(dict(host="[2001:db8::1]"), "[2001:db8::1]"),
(dict(host="2001:db8::1"), "[2001:db8::1]"),
(dict(host="192.168.1.1"), "192.168.1.1"),
(dict(host="example.com"), "example.com"),
(dict(host="sub.example.com"), "sub.example.com"),
])
def test__init_host(kwargs, expected):
"""FortiGateBase._init_host()"""
if isinstance(expected, str):
actual = fortigate_base._init_host(**kwargs)
assert actual == expected
else:
with pytest.raises(expected):
fortigate_base._init_host(**kwargs)


@pytest.mark.parametrize("kwargs, expected", [
({}, ""),
(dict(token="token"), "token"),
Expand Down