Skip to content

Commit

Permalink
refactor: applied all suggestions of ruff linter and ruff formater
Browse files Browse the repository at this point in the history
A ruff config was also added with all the rules that sounded interesting.
  • Loading branch information
mjaepel committed Nov 23, 2024
1 parent 240667b commit 3837434
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 133 deletions.
30 changes: 30 additions & 0 deletions .ruff.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
line-length = 140
indent-width = 4

target-version = "py311"

[lint]
select = ["E", "F", "B", "Q", "W", "I", "N", "UP", "ANN", "S", "FBT", "A", "COM", "C4", "DTZ", "EXE", "FA", "ICN", "LOG", "G", "INP", "PIE", "PYI", "PT", "RSE", "RET", "SLF", "SLOT", "SIM", "TID", "INT", "ARG", "PTH", "TD", "ERA", "PL", "TRY", "PERF", "RUF"]
ignore = [
"G004", # fallback to string formatting syntax while every where f-strings are used, doesn't make sense.
"TRY400", # wtf? we use try-except for NOT printing the full trace ... this would be idiotic to use logging.exception() there :D
"ANN101", # already removed from newer ruff versions
"PERF401", # dont agree with "more readable". Performance difference is negligible
"RET506", # Waiting for https://github.com/astral-sh/ruff/discussions/12468
"COM812" # disabling suggested by ruff formater.
]

fixable = ["ALL"]
unfixable = []

[lint.pylint]
allow-magic-value-types = ["str", "bytes", "float", "int"]

[lint.pep8-naming]
classmethod-decorators = ["field_validator", "model_validator"]

[format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "lf"
118 changes: 64 additions & 54 deletions modules/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from pydantic import BaseModel, ConfigDict, model_validator, field_validator
from typing import List, Optional
from argparse import ArgumentParser
import logging
from argparse import ArgumentParser
from pathlib import Path

import yaml
import pathlib
from pydantic import BaseModel, ConfigDict, field_validator, model_validator

from modules.exceptions import NotAllowedValueError, UnknownConfigAttributeError, UnknownLogLevelError, ZabbixAuthConfigError
from modules.logger import get_logger, set_log_level

###################################################################

Expand All @@ -22,99 +25,96 @@


class GeneralConfig(BaseModel):
loglevel: Optional[str] = "INFO"
loglevel_numeric: Optional[int] = logging.INFO
dryrun: Optional[bool] = False
max_threads: Optional[int] = 10
encryption: Optional[bool] = False
encryption_key: Optional[str]
loglevel: str | None = "INFO"
loglevel_numeric: int | None = logging.INFO
dryrun: bool | None = False
max_threads: int | None = 10
encryption: bool | None = False
encryption_key: str | None

@model_validator(mode="before")
def convert_loglevel(cls, values):
def convert_loglevel(cls, values: dict) -> dict:
loglevel = values.get("loglevel")

if loglevel:
if loglevel.upper() not in loglevel_map:
raise ValueError(f"Invalid loglevel: {loglevel}")
raise UnknownLogLevelError(loglevel, loglevel_map.keys())

values["loglevel_numeric"] = loglevel_map[loglevel.upper()]
return values


class ZabbixAuthConfig(BaseModel):
user: Optional[str] = None
password: Optional[str] = None
token: Optional[str] = None
user: str | None = None
password: str | None = None
token: str | None = None

@model_validator(mode="before")
def check_auth_fields(cls, values):
user = values.get("user")
password = values.get("password")
token = values.get("token")

if token:
if user or password:
raise ValueError("If 'token' is defined, 'user' and 'password' must not be set.")
else:
if not (user and password):
raise ValueError("If 'token' is not defined, both 'user' and 'password' must be set.")
def check_auth_fields(cls, values: dict) -> dict:
if values.get("token") and (values.get("user") or values.get("password")):
logger = get_logger()
logger.warning("Both 'token' and 'user'/'password' are set. Using 'token' for authentication.")
values["user"] = None
values["password"] = None
elif not values.get("token") and not (values.get("user") and values.get("password")):
raise ZabbixAuthConfigError

return values


class ZabbixConfig(BaseModel):
url: str
auth: ZabbixAuthConfig
export_format: Optional[str] = "yaml"
export_format: str | None = "yaml"

@field_validator("export_format")
def check_export_format(cls, value):
def check_export_format(cls, value: str) -> str:
allowed_formats = {"yaml", "json", "xml"}
if value not in allowed_formats:
raise ValueError(f"Allowed values are: {', '.join(allowed_formats)}.")
raise NotAllowedValueError(value, allowed_formats)
return value


class TemplatesConfig(BaseModel):
enable: bool
encryption: Optional[bool] = False
excludes: Optional[List[str]] = []
encryption: bool | None = False
excludes: list[str] | None = []


class TemplategroupsConfig(BaseModel):
enable: bool
encryption: Optional[bool] = False
excludes: Optional[List[str]] = []
encryption: bool | None = False
excludes: list[str] | None = []


class HostsConfig(BaseModel):
enable: bool
encryption: Optional[bool] = False
excludes: Optional[List[str]] = []
encryption: bool | None = False
excludes: list[str] | None = []


class HostgroupsConfig(BaseModel):
enable: bool
encryption: Optional[bool] = False
excludes: Optional[List[str]] = []
encryption: bool | None = False
excludes: list[str] | None = []


class MapsConfig(BaseModel):
enable: bool
encryption: Optional[bool] = False
excludes: Optional[List[str]] = []
encryption: bool | None = False
excludes: list[str] | None = []


class ImagesConfig(BaseModel):
enable: bool
encryption: Optional[bool] = False
excludes: Optional[List[str]] = []
encryption: bool | None = False
excludes: list[str] | None = []


class MediatypesConfig(BaseModel):
enable: bool
encryption: Optional[bool] = False
excludes: Optional[List[str]] = []
encryption: bool | None = False
excludes: list[str] | None = []


class InputsConfig(BaseModel):
Expand Down Expand Up @@ -147,31 +147,41 @@ class Configuration(BaseModel):

###################################################################

class ConfigParser():
def __init__(self):
self.config_file = pathlib.Path(__file__).parent.parent / "config.yaml"
self.argsparser = ArgumentParser()
self.argsparser.add_argument("-c", "--config", default=self.config_file, help=f"Configuration file (default: {self.config_file})", required=False)

def load_data(self):
class ConfigParser:
def __init__(self) -> None:
self.config_file = Path(__file__).parent.parent / "config.yaml"
self.argsparser = ArgumentParser()
self.argsparser.add_argument(
"-c",
"--config",
default=self.config_file,
help=f"Configuration file (default: {self.config_file})",
required=False,
)

def load_data(self) -> Configuration:
self.args = self.argsparser.parse_args()
self.config_file = self.args.config
with open(self.config_file, "r") as file:
with Path.open(self.config_file, "r") as file:
config_dict = yaml.safe_load(file)

config_data = Configuration.model_validate(config_dict)
set_log_level(config_data.general.loglevel_numeric)

return config_data

def __getattr__(self, name: str):
def __getattr__(self, name: str) -> BaseModel:
config_data = self.load_data()
if hasattr(config_data, name):
return getattr(config_data, name)
raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
raise UnknownConfigAttributeError(name)

def __repr__(self):
def __repr__(self) -> str:
return str(self.load_data().model_dump())

def add_argument(self, *args, **kwargs):
def add_argument(self, *args: int, **kwargs: int) -> None:
self.argsparser.add_argument(*args, **kwargs)


config = ConfigParser()
8 changes: 4 additions & 4 deletions modules/crypto.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from cryptography.fernet import Fernet
import hashlib
import base64
import hashlib

from cryptography.fernet import Fernet


def convert_key(key: str) -> bytes:
input_bytes = key.encode()
sha256_hash = hashlib.sha256(input_bytes).digest()
fernet_key = base64.urlsafe_b64encode(sha256_hash)
return fernet_key
return base64.urlsafe_b64encode(sha256_hash)


def encrypt(content: str, key: str) -> bytes:
Expand Down
29 changes: 29 additions & 0 deletions modules/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
class NotAllowedValueError(Exception):
def __init__(self, value: any, allowed_values: list[any]) -> None:
self.value = value
self.allowed_values = allowed_values

def __str__(self) -> str:
return f"Value '{self.value}' is not allowed. Allowed values are: {', '.join(self.allowed_values)}."


class UnknownLogLevelError(Exception):
def __init__(self, value: any, allowed_values: list) -> None:
self.value = value
self.allowed_values = allowed_values

def __str__(self) -> str:
return f"Log level '{self.value}' is unknown. Allowed values are: {', '.join(self.allowed_values)}."


class UnknownConfigAttributeError(Exception):
def __init__(self, attribute: str) -> None:
self.attribute = attribute

def __str__(self) -> str:
return f"Configuration has no attribute '{self.attribute}'"


class ZabbixAuthConfigError(Exception):
def __str__(self) -> str:
return "You have to set token or username/password for Zabbix API"
Loading

0 comments on commit 3837434

Please sign in to comment.