generated from antibagr/Any-Python-Template
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
38 changed files
with
2,019 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
ENVIRONMENT=development | ||
DEBUG=true | ||
TEST_SSL_CONTAINER_NAME=testssl.sh | ||
TEST_SSL_OUTPUT_FILE=output.json | ||
TEST_SSL_INPUT_FILE=input.txt | ||
TEST_SSL_DATA_DIR=/data | ||
CLICKHOUSE_HOST=clickhouse | ||
CLICKHOUSE_PORT=8123 | ||
CLICKHOUSE_USER=default | ||
CLICKHOUSE_PASSWORD=clickhouse_password | ||
CLICKHOUSE_TABLE_NAME=ssl_check | ||
CLICKHOUSE_DB=default |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
.idea/ | ||
.env | ||
|
||
# pyenv | ||
.python-version | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import signal | ||
import typing as t | ||
|
||
import click | ||
|
||
from app.services.service import application_dependencies, ssl_checker_service | ||
|
||
|
||
@click.group() | ||
def cli() -> None: | ||
... | ||
|
||
|
||
def handle_exit_signal(_sig, _frame) -> t.NoReturn: | ||
raise SystemExit | ||
|
||
|
||
@click.command() | ||
def run() -> None: | ||
with application_dependencies(): | ||
ssl_checker_service.run() | ||
|
||
|
||
cli.add_command(run) | ||
signal.signal(signal.SIGINT, handle_exit_signal) | ||
signal.signal(signal.SIGTERM, handle_exit_signal) | ||
|
||
if __name__ == "__main__": | ||
cli() |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
import typing as t | ||
|
||
TestSSLRecord = t.TypedDict( | ||
"Record", | ||
{ | ||
"id": str, | ||
"ip": str, | ||
"port": str, | ||
"severity": str, | ||
"finding": str, | ||
}, | ||
) | ||
|
||
TestSSLRecords = list[TestSSLRecord] | ||
Domain = t.NewType("Domain", str) |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from __future__ import annotations | ||
|
||
import datetime as dt | ||
import typing as t | ||
|
||
import bson | ||
import pydantic | ||
from bson.errors import InvalidId | ||
|
||
|
||
class ObjectId(bson.ObjectId): | ||
@classmethod | ||
def __get_validators__(cls): | ||
yield cls.validate | ||
|
||
@classmethod | ||
def validate(cls, v: t.Any) -> ObjectId: | ||
try: | ||
return cls(v) | ||
except InvalidId as exc: | ||
raise ValueError(f"{v} is not a valid ObjectId") from exc | ||
|
||
|
||
class BaseModel(pydantic.BaseModel): | ||
model_config = pydantic.ConfigDict( | ||
validate_assignment=True, | ||
json_encoders={ | ||
dt.datetime: dt.datetime.isoformat, | ||
ObjectId: str, | ||
set: list, | ||
}, | ||
) | ||
|
||
@property | ||
def is_empty(self) -> bool: | ||
return None in self.model_dump(warnings=False).values() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
import typing as t | ||
|
||
from pydantic_mongo import AbstractRepository | ||
|
||
from app.dto.entities.base import ObjectId | ||
from app.dto.entities.fqdn import FQDN | ||
|
||
|
||
# @t.final | ||
class MongoFQDN(FQDN): | ||
id: ObjectId = None | ||
|
||
|
||
class FQDNRepository(AbstractRepository[MongoFQDN]): | ||
class Meta: | ||
collection_name = "fqdns" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import datetime as dt | ||
import typing as t | ||
|
||
from pydantic import ConfigDict, Field, field_serializer | ||
|
||
from app.dto.annotations import Domain | ||
from app.dto.entities.base import BaseModel | ||
|
||
|
||
class FQDN(BaseModel): | ||
model_config = ConfigDict(from_attributes=True) | ||
fqdn: str = Field(..., alias="fqdn") | ||
alt_names: set[Domain] = Field(..., alias="alt_names", default_factory=set) | ||
supported_protocols: list[str] = Field(..., alias="supported_protocols", default_factory=list) | ||
|
||
@field_serializer("alt_names") | ||
def serialize_alt_names(self, value: set[Domain]) -> list[str]: | ||
return list(value) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
import enum | ||
import ssl | ||
import typing as t | ||
|
||
|
||
@t.final | ||
@enum.unique | ||
class ProtocolVersion(enum.IntEnum): | ||
# SSLv2 = ssl.PROTOCOL_SSLv23 | ||
# SSLv3 = ssl.PROTOCOL_SSLv23 | ||
TLSv1 = ssl.PROTOCOL_TLSv1 | ||
TLSv1_1 = ssl.PROTOCOL_TLSv1_1 | ||
TLSv1_2 = ssl.PROTOCOL_TLSv1_2 | ||
TLSv1_3 = ssl.PROTOCOL_TLS |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import io | ||
import json | ||
import pathlib | ||
import tarfile | ||
|
||
import docker | ||
import requests | ||
from docker.models.containers import Container | ||
from loguru import logger | ||
|
||
from app.dto.annotations import TestSSLRecord | ||
|
||
|
||
class TestSSLContainer: | ||
def __init__( | ||
self, | ||
*, | ||
client: docker.DockerClient, | ||
container_name: str, | ||
output_path: pathlib.Path, | ||
) -> None: | ||
self._client = client | ||
self._container_name = container_name | ||
self._output_path = output_path | ||
|
||
@property | ||
def container(self) -> Container: | ||
return self._client.containers.get(self._container_name) | ||
|
||
def stop(self) -> None: | ||
self.container.stop() | ||
|
||
def wait_for_complete(self) -> None: | ||
try: | ||
logger.info("Waiting for container to complete") | ||
self.container.wait() | ||
except requests.exceptions.ReadTimeout: | ||
logger.error("Container timeout") | ||
self.container.kill() | ||
|
||
def get_json(self) -> list[TestSSLRecord]: | ||
tar_gz, _ = self.container.get_archive(self._output_path, encode_stream=True) | ||
|
||
with tarfile.open( | ||
fileobj=io.BytesIO(b"".join(tar_gz)), | ||
mode="r", | ||
) as tar: | ||
try: | ||
json_bytes: bytes = tar.extractfile(self._output_path.name).read() | ||
except KeyError: | ||
logger.error("No file in tar archive") | ||
return [] | ||
return json.loads(json_bytes) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import typing as t | ||
|
||
from loguru import logger | ||
|
||
from app.dto.annotations import TestSSLRecords | ||
from app.dto.entities.fqdn import FQDN | ||
|
||
Protocols = ("SSLv2", "SSLv3", "TLS1", "TLS1_1", "TLS1_2", "TLS1_3") | ||
|
||
logger.bind(context="TestSSL") | ||
|
||
|
||
class JsonParserWarning(Warning): | ||
""" | ||
Warning indicating that the JSON parser has encountered an error. | ||
""" | ||
|
||
|
||
class TestSSLJsonParser: | ||
def __init__(self) -> None: | ||
self._data = [] | ||
|
||
def set_data(self, *, data: TestSSLRecords) -> None: | ||
self._data = data | ||
|
||
def parse(self) -> t.Iterable[FQDN]: | ||
fqdns: dict[str, FQDN] = {} | ||
|
||
for record in self._data: | ||
logger.bind(ip=record["ip"]) | ||
|
||
try: | ||
fqdn = record["ip"].split("/")[0] | ||
except Exception as exc: | ||
logger.warning(f"Failed to parse FQDN from {record['ip']}: {exc}") | ||
continue | ||
|
||
if not fqdn: | ||
continue | ||
|
||
if fqdn not in fqdns: | ||
fqdns[fqdn] = FQDN(fqdn=fqdn) | ||
|
||
if record["id"] in Protocols: | ||
if record["finding"] != "not offered": | ||
fqdns[fqdn].supported_protocols.append(record["id"]) | ||
elif record["id"].startswith("cert_subjectAltName"): | ||
fqdns[fqdn].alt_names |= set(record["finding"].split()) | ||
|
||
return iter(fqdns.values()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from app.repository.db.db import DB | ||
|
||
__all__ = [ | ||
"DB", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import types | ||
import typing as t | ||
|
||
from loguru import logger | ||
from pymongo.mongo_client import MongoClient | ||
|
||
from app.settings import settings | ||
|
||
|
||
class BaseDB: | ||
def __enter__(self) -> t.Self: | ||
return self | ||
|
||
def __exit__( | ||
self, | ||
_exc_type: type[BaseException], | ||
_value: BaseException, | ||
_traceback: types.TracebackType, | ||
) -> None: | ||
... | ||
|
||
def connect(self) -> None: | ||
logger.bind(context=self.__class__.__name__).info("database_connected") | ||
|
||
def disconnect(self) -> None: | ||
logger.bind(context=self.__class__.__name__).info("database_disconnected") | ||
|
||
def is_alive(self) -> bool: | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from clickhouse_connect.driver.client import Client as ClickHouseClient | ||
|
||
from app.repository.db.base import BaseDB | ||
|
||
|
||
class ClickHouseDB(BaseDB): | ||
def __init__( | ||
self, | ||
*, | ||
client: ClickHouseClient, | ||
table_name: str, | ||
) -> None: | ||
super().__init__() | ||
self._client = client | ||
self._table = table_name | ||
|
||
def connect(self) -> None: | ||
self._client.command( | ||
"""CREATE TABLE IF NOT EXISTS {table:Identifier} ( | ||
`fqdn` String, | ||
`alt_names` Array(String), | ||
`supported_protocols` Array(String) | ||
) | ||
ENGINE = ReplacingMergeTree | ||
PRIMARY KEY (fqdn) | ||
ORDER BY (fqdn); | ||
""", | ||
parameters={"table": self._table}, | ||
) | ||
|
||
def disconnect(self) -> None: | ||
return self._client.close() | ||
|
||
def is_alive(self) -> bool: | ||
return self._client.ping() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from app.repository.db.fqdn import ClickHouseFQDNDB | ||
|
||
|
||
class DB( | ||
ClickHouseFQDNDB, | ||
): | ||
... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from clickhouse_connect.driver.client import Client as ClickHouseClient | ||
from clickhouse_connect.driver.insert import InsertContext | ||
from loguru import logger | ||
from pymongo.mongo_client import MongoClient | ||
|
||
from app.dto.entities.collections import FQDNRepository, MongoFQDN | ||
from app.dto.entities.fqdn import FQDN | ||
from app.repository.db.clickhouse import ClickHouseDB | ||
from app.repository.db.mongo import MongoDB | ||
|
||
|
||
class MongoFQDNDB(MongoDB): | ||
def __init__(self, *, client: MongoClient, database: str) -> None: | ||
super().__init__(client=client, database=database) | ||
self._repository = FQDNRepository(database=self._database) | ||
|
||
def save_fqdn(self, *, fqdn: MongoFQDN) -> None: | ||
self._repository.save(fqdn) | ||
|
||
|
||
class ClickHouseFQDNDB(ClickHouseDB): | ||
def __init__( | ||
self, | ||
*, | ||
client: ClickHouseClient, | ||
table_name: str, | ||
) -> None: | ||
super().__init__(client=client, table_name=table_name) | ||
self._context = None | ||
|
||
@property | ||
def context(self) -> InsertContext: | ||
if self._context is None: | ||
self._context = self._client.create_insert_context( | ||
table=self._table, | ||
column_names=["fqdn", "alt_names", "supported_protocols"], | ||
) | ||
return self._context | ||
|
||
def save_fqdn(self, *, fqdn: FQDN) -> None: | ||
data = [list(fqdn.model_dump().values())] | ||
self.context.data = data | ||
self._client.insert(context=self.context) |
Oops, something went wrong.