Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce gallia.net for networking functions #642

Merged
merged 8 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 0 additions & 53 deletions .github/workflows/codeql-analysis.yml

This file was deleted.

3 changes: 3 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ jobs:
bats:
strategy:
fail-fast: false
matrix:
python-version: ['3.12', '3.13']

runs-on: ubuntu-latest
container: debian:trixie
Expand All @@ -64,6 +66,7 @@ jobs:
- name: Set up Python ${{ matrix.python-version }}
run: |
uv python install ${{ matrix.python-version }}
uv python pin ${{ matrix.python-version }}

- name: Install Dependencies
run: |
Expand Down
2 changes: 1 addition & 1 deletion src/gallia/commands/discover/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from gallia.command.base import AsyncScriptConfig
from gallia.command.config import AutoInt, Field
from gallia.log import get_logger
from gallia.net import net_if_broadcast_addrs
from gallia.services.uds.core.service import TesterPresentRequest, TesterPresentResponse
from gallia.transports.doip import (
DiagnosticMessageNegativeAckCodes,
Expand All @@ -29,7 +30,6 @@
TimingAndCommunicationParameters,
VehicleAnnouncementMessage,
)
from gallia.utils import net_if_broadcast_addrs

logger = get_logger(__name__)

Expand Down
3 changes: 2 additions & 1 deletion src/gallia/dumpcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from urllib.parse import urlparse

from gallia.log import get_logger
from gallia.net import split_host_port
from gallia.transports import TargetURI, TransportScheme
from gallia.utils import auto_int, handle_task_error, set_task_handler_ctx_variable, split_host_port
from gallia.utils import auto_int, handle_task_error, set_task_handler_ctx_variable

logger = get_logger(__name__)

Expand Down
115 changes: 115 additions & 0 deletions src/gallia/net.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# SPDX-FileCopyrightText: AISEC Pentesting Team
#
# SPDX-License-Identifier: Apache-2.0

import ipaddress
import subprocess
from urllib.parse import urlparse

import pydantic
from pydantic.networks import IPvAnyAddress

from gallia.log import get_logger
from gallia.utils import supports_platform

logger = get_logger(__name__)


def split_host_port(
hostport: str,
default_port: int | None = None,
) -> tuple[str, int | None]:
"""Splits a combination of ip address/hostname + port into hostname/ip address
and port. The default_port argument can be used to return a port if it is
absent in the hostport argument."""
# Special case: If hostport is an ipv6 then the urlparser does some weird
# things with the colons and tries to parse ports. Catch this case early.
host = ""
port = default_port
try:
# If hostport is a valid ip address (v4 or v6) there
# is no port included
host = str(ipaddress.ip_address(hostport))
except ValueError:
pass

# Only parse if hostport is not a valid ip address.
if host == "":
# urlparse() and urlsplit() insists on absolute URLs starting with "//".
url = urlparse(f"//{hostport}")
host = url.hostname if url.hostname else url.netloc
port = url.port if url.port else default_port
return host, port


def join_host_port(host: str, port: int) -> str:
if ":" in host:
return f"[{host}]:port"
return f"{host}:{port}"


class AddrInfo(pydantic.BaseModel):
family: str
local: IPvAnyAddress
prefixlen: int
broadcast: IPvAnyAddress | None = None
scope: str
label: str | None = None
valid_life_time: int
preferred_life_time: int

def is_v4(self) -> bool:
return self.family == "inet"


class Interface(pydantic.BaseModel):
ifindex: int
ifname: str
flags: list[str]
mtu: int
qdisc: str
operstate: str
group: str
link_type: str
address: str | None = None
broadcast: str | None = None
addr_info: list[AddrInfo]

def is_up(self) -> bool:
return self.operstate == "UP"

def can_broadcast(self) -> bool:
return "BROADCAST" in self.flags


@supports_platform("linux")
def net_if_addrs() -> list[Interface]:
try:
p = subprocess.run(["ip", "-j", "address", "show"], capture_output=True, check=True)
except FileNotFoundError as e:
logger.warning(f"Could not query information about interfaces: {e}")
return []

try:
iface_list_type = pydantic.TypeAdapter(list[Interface])
return iface_list_type.validate_json(p.stdout.decode())
except pydantic.ValidationError as e:
logger.error("BUG: A special case for `ip -j address show` is not handled!")
logger.error("Please report a bug including the following json string.")
logger.error("https://github.com/Fraunhofer-AISEC/gallia/issues")
logger.error(e.json())
raise


@supports_platform("linux")
def net_if_broadcast_addrs() -> list[AddrInfo]:
out = []
for iface in net_if_addrs():
if not (iface.is_up() and iface.can_broadcast()):
continue

for addr in iface.addr_info:
if not addr.is_v4() or addr.broadcast is None:
continue
out.append(addr)
return out
2 changes: 1 addition & 1 deletion src/gallia/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse

from gallia.log import get_logger
from gallia.net import join_host_port
from gallia.transports.schemes import TransportScheme
from gallia.utils import join_host_port

logger = get_logger(__name__)

Expand Down
119 changes: 17 additions & 102 deletions src/gallia/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,16 @@
import asyncio
import contextvars
import importlib.util
import ipaddress
import json
import logging
import re
import subprocess
import sys
from collections.abc import Awaitable, Callable
from functools import wraps
from pathlib import Path
from types import ModuleType
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse

import aiofiles
import pydantic
from pydantic.networks import IPvAnyAddress

from gallia.log import Loglevel, get_logger

Expand All @@ -48,39 +43,6 @@ def strtobool(val: str) -> bool:
raise ValueError(f"invalid truth value {val!r}")


def split_host_port(
hostport: str,
default_port: int | None = None,
) -> tuple[str, int | None]:
"""Splits a combination of ip address/hostname + port into hostname/ip address
and port. The default_port argument can be used to return a port if it is
absent in the hostport argument."""
# Special case: If hostport is an ipv6 then the urlparser does some weird
# things with the colons and tries to parse ports. Catch this case early.
host = ""
port = default_port
try:
# If hostport is a valid ip address (v4 or v6) there
# is no port included
host = str(ipaddress.ip_address(hostport))
except ValueError:
pass

# Only parse if hostport is not a valid ip address.
if host == "":
# urlparse() and urlsplit() insists on absolute URLs starting with "//".
url = urlparse(f"//{hostport}")
host = url.hostname if url.hostname else url.netloc
port = url.port if url.port else default_port
return host, port


def join_host_port(host: str, port: int) -> str:
if ":" in host:
return f"[{host}]:port"
return f"{host}:{port}"


def camel_to_snake(s: str) -> str:
"""Convert a CamelCase string to a snake_case string."""
# https://stackoverflow.com/a/1176023
Expand Down Expand Up @@ -310,69 +272,22 @@ def handle_task_error(fut: asyncio.Future[Any]) -> None:
logger.info(f"{task_name} ended with error: {e!r}")


class AddrInfo(pydantic.BaseModel):
family: str
local: IPvAnyAddress
prefixlen: int
broadcast: IPvAnyAddress | None = None
scope: str
label: str | None = None
valid_life_time: int
preferred_life_time: int

def is_v4(self) -> bool:
return self.family == "inet"
def supports_platform[T, **P](*platform: str) -> Callable[[Callable[P, T]], Callable[P, T]]:
def decorator(function: Callable[P, T]) -> Callable[P, T]:
@wraps(function)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
supported = False
for p in platform:
if sys.platform == p:
supported = True
break
if supported is False:
raise NotImplementedError(
f'`{function.__name__}()` is not supported on: "{sys.platform}"'
)

return function(*args, **kwargs)

class Interface(pydantic.BaseModel):
ifindex: int
ifname: str
flags: list[str]
mtu: int
qdisc: str
operstate: str
group: str
link_type: str
address: str | None = None
broadcast: str | None = None
addr_info: list[AddrInfo]
return wrapper

def is_up(self) -> bool:
return self.operstate == "UP"

def can_broadcast(self) -> bool:
return "BROADCAST" in self.flags


def net_if_addrs() -> list[Interface]:
if sys.platform != "linux":
raise NotImplementedError("net_if_addrs() is only supported on Linux platforms")

try:
p = subprocess.run(["ip", "-j", "address", "show"], capture_output=True, check=True)
except FileNotFoundError as e:
logger.warning(f"Could not query information about interfaces: {e}")
return []

try:
return [Interface(**item) for item in json.loads(p.stdout.decode())]
except pydantic.ValidationError as e:
logger.error("BUG: A special case for `ip -j address show` is not handled!")
logger.error("Please report a bug including the following json string.")
logger.error("https://github.com/Fraunhofer-AISEC/gallia/issues")
logger.error(e.json())
raise


def net_if_broadcast_addrs() -> list[AddrInfo]:
out = []
for iface in net_if_addrs():
if not (iface.is_up() and iface.can_broadcast()):
continue

for addr in iface.addr_info:
# We only work with broadcastable IPv4.
if not addr.is_v4() or addr.broadcast is None:
continue
out.append(addr)
return out
return decorator
2 changes: 1 addition & 1 deletion tests/pytest/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import pytest

from gallia.log import setup_logging
from gallia.net import split_host_port
from gallia.services.uds.core.utils import (
address_and_size_length,
uds_memory_parameters,
)
from gallia.utils import split_host_port

setup_logging()

Expand Down
2 changes: 1 addition & 1 deletion tests/pytest/test_net_if.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0

from gallia.utils import net_if_addrs
from gallia.net import net_if_addrs


def test_net_if_addrs() -> None:
Expand Down
Loading