diff --git a/integration-tests/.pytest.ini b/integration-tests/.pytest.ini index 07fb7b9ee4..1fdfda8065 100644 --- a/integration-tests/.pytest.ini +++ b/integration-tests/.pytest.ini @@ -4,6 +4,7 @@ addopts = --code-highlight=yes --color=yes -rA + -rA --strict-config --strict-markers --verbose @@ -20,3 +21,5 @@ markers = clp_s: mark tests that use the CLP-S storage engine core: mark tests that test the CLP core binaries package: mark tests that run when the CLP package is active + +BASE_PORT = 55000 diff --git a/integration-tests/tests/conftest.py b/integration-tests/tests/conftest.py index 224fefcd0d..28c94488d9 100644 --- a/integration-tests/tests/conftest.py +++ b/integration-tests/tests/conftest.py @@ -1,5 +1,8 @@ """Global pytest setup.""" +import pytest + +# Make the fixtures defined in `tests/fixtures/` globally available without imports. # Make the fixtures defined in `tests/fixtures/` globally available without imports. pytest_plugins = [ "tests.fixtures.integration_test_logs", @@ -7,3 +10,27 @@ "tests.fixtures.package_instance", "tests.fixtures.package_config", ] + + +def pytest_addoption(parser: pytest.Parser) -> None: + """ + Adds options for pytest. + + :param parser: + """ + parser.addini( + "BASE_PORT", + "Base port for CLP package integration tests.", + default="55000", + ) + parser.addoption( + "--job-name", + dest="JOB_NAME", + help="Filter CLP jobs by substring of their job_name.", + ) + parser.addoption( + "--no-jobs", + action="store_true", + dest="NO_JOBS", + help=("Only validate CLP package start and stop. Do not create or run any test jobs."), + ) diff --git a/integration-tests/tests/fixtures/package_config.py b/integration-tests/tests/fixtures/package_config.py index 7f40edf9a3..3781713914 100644 --- a/integration-tests/tests/fixtures/package_config.py +++ b/integration-tests/tests/fixtures/package_config.py @@ -1,14 +1,27 @@ """Fixtures that create and remove temporary config files for CLP packages.""" +import shutil from collections.abc import Iterator import pytest +from clp_py_utils.clp_config import ( + CLP_DEFAULT_DATA_DIRECTORY_PATH, + CLP_DEFAULT_LOG_DIRECTORY_PATH, + CLP_DEFAULT_TMP_DIRECTORY_PATH, +) +from tests.utils.clp_job_utils import ( + build_package_job_list, +) from tests.utils.clp_mode_utils import ( get_clp_config_from_mode, get_required_component_list, ) -from tests.utils.config import PackageConfig, PackagePathConfig +from tests.utils.config import ( + PackageConfig, + PackagePathConfig, +) +from tests.utils.port_utils import assign_ports_from_base @pytest.fixture @@ -26,17 +39,42 @@ def fixt_package_config( clp_config_obj = get_clp_config_from_mode(mode_name) + # Assign ports based on BASE_PORT from ini. + base_port_string = request.config.getini("BASE_PORT") + try: + base_port = int(base_port_string) + except ValueError as err: + err_msg = ( + f"Invalid BASE_PORT value '{base_port_string}' in pytest.ini; expected an integer." + ) + raise ValueError(err_msg) from err + + assign_ports_from_base(base_port, clp_config_obj) + required_components = get_required_component_list(clp_config_obj) + # Build the job list for this mode and the current job filter. + no_jobs: bool = bool(request.config.option.NO_JOBS) + job_filter: str = request.config.option.JOB_NAME or "" + package_job_list = None if no_jobs else build_package_job_list(mode_name, job_filter) + # Construct PackageConfig. package_config = PackageConfig( path_config=fixt_package_path_config, mode_name=mode_name, component_list=required_components, clp_config=clp_config_obj, + package_job_list=package_job_list, ) try: yield package_config finally: package_config.temp_config_file_path.unlink(missing_ok=True) + + # Clear data, tmp, and log from the package directory. + data_dir = package_config.path_config.clp_package_dir / CLP_DEFAULT_DATA_DIRECTORY_PATH + tmp_dir = package_config.path_config.clp_package_dir / CLP_DEFAULT_TMP_DIRECTORY_PATH + log_dir = package_config.path_config.clp_package_dir / CLP_DEFAULT_LOG_DIRECTORY_PATH + for directory_path in (data_dir, tmp_dir, log_dir): + shutil.rmtree(directory_path, ignore_errors=True) diff --git a/integration-tests/tests/fixtures/package_instance.py b/integration-tests/tests/fixtures/package_instance.py index 310e7eaf77..d31d53f2c0 100644 --- a/integration-tests/tests/fixtures/package_instance.py +++ b/integration-tests/tests/fixtures/package_instance.py @@ -15,7 +15,10 @@ @pytest.fixture -def fixt_package_instance(fixt_package_config: PackageConfig) -> Iterator[PackageInstance]: +def fixt_package_instance( + request: pytest.FixtureRequest, + fixt_package_config: PackageConfig, +) -> Iterator[PackageInstance]: """ Starts a CLP package instance for the given configuration and stops it during teardown. @@ -23,12 +26,23 @@ def fixt_package_instance(fixt_package_config: PackageConfig) -> Iterator[Packag :return: Iterator that yields the running package instance. """ mode_name = fixt_package_config.mode_name + no_jobs: bool = bool(request.config.option.NO_JOBS) + package_job_list = fixt_package_config.package_job_list + + # Do not start this mode if there are no jobs and the '--no-jobs' flag wasn't specified by user. + if package_job_list is None and not no_jobs: + pytest.skip(f"No jobs to run for mode {mode_name} with current job filter.") try: start_clp_package(fixt_package_config) instance = PackageInstance(package_config=fixt_package_config) yield instance except RuntimeError: - pytest.fail(f"Failed to start the {mode_name} package.") + base_port_string = request.config.getini("BASE_PORT") + pytest.fail( + f"Failed to start the {mode_name} package. This could mean that one of the ports" + f" derived from BASE_PORT={base_port_string} was unavailable. Try changing BASE_PORT in" + " .pytest.ini." + ) finally: stop_clp_package(fixt_package_config) diff --git a/integration-tests/tests/test_package_start.py b/integration-tests/tests/test_package_start.py index ace9dd6e79..f1064117dd 100644 --- a/integration-tests/tests/test_package_start.py +++ b/integration-tests/tests/test_package_start.py @@ -4,8 +4,17 @@ import pytest +from tests.utils.asserting_utils import ( + validate_package_running, + validate_running_mode_correct, +) +from tests.utils.clp_job_utils import ( + dispatch_test_jobs, +) from tests.utils.clp_mode_utils import CLP_MODE_CONFIGS -from tests.utils.config import PackageInstance +from tests.utils.config import ( + PackageInstance, +) TEST_MODES = CLP_MODE_CONFIGS.keys() @@ -14,19 +23,21 @@ @pytest.mark.package @pytest.mark.parametrize("fixt_package_config", TEST_MODES, indirect=True) -def test_clp_package(fixt_package_instance: PackageInstance) -> None: +def test_clp_package( + request: pytest.FixtureRequest, fixt_package_instance: PackageInstance +) -> None: """ Validate that the CLP package starts up successfully for the selected mode(s) of operation. :param fixt_package_instance: """ - # TODO: write code that properly validates that the package is running. This is a placeholder. - mode_name = fixt_package_instance.package_config.mode_name - logger.info("The '%s' package has been spun up successfully.", mode_name) - - component_list = fixt_package_instance.package_config.component_list - logger.info( - "The '%s' package runs with the following required components :\n%s", - mode_name, - "\n".join(component_list), - ) + # Ensure that all package components are running. + validate_package_running(fixt_package_instance) + + # Ensure that the package is running in the correct mode. + validate_running_mode_correct(fixt_package_instance) + + # Run all jobs. + package_job_list = fixt_package_instance.package_config.package_job_list + if package_job_list is not None: + dispatch_test_jobs(request, fixt_package_instance) diff --git a/integration-tests/tests/utils/asserting_utils.py b/integration-tests/tests/utils/asserting_utils.py index 3658f83ad3..5b6ba7e9df 100644 --- a/integration-tests/tests/utils/asserting_utils.py +++ b/integration-tests/tests/utils/asserting_utils.py @@ -6,6 +6,15 @@ from typing import Any import pytest +from clp_py_utils.clp_config import ClpConfig +from pydantic import ValidationError + +from tests.utils.clp_mode_utils import ( + compute_mode_signature, +) +from tests.utils.config import PackageInstance +from tests.utils.docker_utils import list_running_containers_with_prefix +from tests.utils.utils import load_yaml_to_dict logger = logging.getLogger(__name__) @@ -28,3 +37,47 @@ def run_and_assert(cmd: list[str], **kwargs: Any) -> subprocess.CompletedProcess except subprocess.TimeoutExpired as e: pytest.fail(f"Command timed out: {' '.join(cmd)}: {e}") return proc + + +def validate_package_running(package_instance: PackageInstance) -> None: + """ + Validate that the given package instance is running. Each required component must have at least + one running container whose name matches the expected prefix. Calls pytest.fail on the first + missing component. + + :param package_instance: + """ + instance_id = package_instance.clp_instance_id + required_components = package_instance.package_config.component_list + + for component in required_components: + prefix = f"clp-package-{instance_id}-{component}-" + running_matches = list_running_containers_with_prefix(prefix) + if len(running_matches) == 0: + pytest.fail( + f"No running container found for component '{component}' " + f"(expected name prefix '{prefix}')." + ) + + +def validate_running_mode_correct(package_instance: PackageInstance) -> None: + """ + Validate that the mode described in the shared config of the instance matches the intended mode + defined by the instance configuration. Calls pytest.fail if the shared config fails validation + or if the running mode does not match the intended mode. + + :param package_instance: + """ + shared_config_dict = load_yaml_to_dict(package_instance.shared_config_file_path) + try: + running_config = ClpConfig.model_validate(shared_config_dict) + except ValidationError as err: + pytest.fail(f"Shared config failed validation: {err}") + + intended_config = package_instance.package_config.clp_config + + running_signature = compute_mode_signature(running_config) + intended_signature = compute_mode_signature(intended_config) + + if running_signature != intended_signature: + pytest.fail("Mode mismatch: running configuration does not match intended configuration.") diff --git a/integration-tests/tests/utils/clp_job_utils.py b/integration-tests/tests/utils/clp_job_utils.py new file mode 100644 index 0000000000..e2d303cc15 --- /dev/null +++ b/integration-tests/tests/utils/clp_job_utils.py @@ -0,0 +1,101 @@ +"""Provides utilities related to the test jobs for the CLP package.""" + +import logging + +import pytest + +from tests.utils.config import ( + PackageCompressJob, + PackageInstance, + PackageJobList, +) +from tests.utils.package_utils import compress_with_clp_package + +logger = logging.getLogger(__name__) + + +PACKAGE_COMPRESS_JOBS: dict[str, PackageCompressJob] = { + "compress-postgresql": PackageCompressJob( + job_name="compress-postgresql", + log_fixture_name="postgresql", + mode="clp-json", + log_format="json", + unstructured=False, + dataset_name="postgresql", + timestamp_key="timestamp", + ), + "compress-hive-24hr": PackageCompressJob( + job_name="compress-hive-24hr", + log_fixture_name="hive_24hr", + mode="clp-text", + log_format="text", + unstructured=True, + ), + # Insert more compression jobs here as needed. +} + + +def _matches_keyword(job_name: str, keyword_filter: str) -> bool: + """Return True if this job should be included given the current -k filter.""" + if not keyword_filter: + return True + return keyword_filter.lower() in job_name.lower() + + +def build_package_job_list(mode_name: str, job_filter: str) -> PackageJobList | None: + """ + Builds the list of package jobs for this test run. + + :param mode_name: + :param job_filter: + :return: PackageJobList if there are jobs for this mode, None if not. + """ + logger.debug("Creating job list for mode %s (job filter: %s)", mode_name, job_filter) + + package_compress_jobs: list[PackageCompressJob] = [] + + for job_name, package_compress_job in PACKAGE_COMPRESS_JOBS.items(): + if package_compress_job.mode == mode_name and _matches_keyword(job_name, job_filter): + package_compress_jobs.append(package_compress_job) + + if not package_compress_jobs: + return None + return PackageJobList( + package_compress_jobs=package_compress_jobs, + ) + + +def _run_package_compress_jobs( + request: pytest.FixtureRequest, + package_instance: PackageInstance, +) -> None: + """ + Run all the package compress jobs for this test run. + + :param package_instance: + :param request: + """ + package_job_list = package_instance.package_config.package_job_list + if package_job_list is None: + err_msg = "Package job list is not configured for this package instance." + raise RuntimeError(err_msg) + + compress_jobs = package_job_list.package_compress_jobs + for compress_job in compress_jobs: + compress_with_clp_package(request, compress_job, package_instance) + + +def dispatch_test_jobs(request: pytest.FixtureRequest, package_instance: PackageInstance) -> None: + """ + Dispatches all the package jobs in `job_list` for this package test run. + + :param jobs_list: + """ + logger.debug("Dispatching test jobs.") + + jobs_list = package_instance.package_config.package_job_list + if jobs_list is None: + return + + if jobs_list.package_compress_jobs: + _run_package_compress_jobs(request, package_instance) diff --git a/integration-tests/tests/utils/clp_mode_utils.py b/integration-tests/tests/utils/clp_mode_utils.py index b81f9d381e..f300b2e6da 100644 --- a/integration-tests/tests/utils/clp_mode_utils.py +++ b/integration-tests/tests/utils/clp_mode_utils.py @@ -1,6 +1,7 @@ """Provides utilities related to the user-level configurations of CLP's operating modes.""" from collections.abc import Callable +from typing import Any from clp_py_utils.clp_config import ( API_SERVER_COMPONENT_NAME, @@ -74,6 +75,26 @@ def _to_docker_compose_service_name(name: str) -> str: CLP_MCP_SERVER_COMPONENT = _to_docker_compose_service_name(MCP_SERVER_COMPONENT_NAME) +def compute_mode_signature(config: ClpConfig) -> tuple[Any, ...]: + """ + Constructs a signature that captures the mode-defining aspects of a ClpConfig object. + + :param config: + :return: Tuple that encodes fields used to determine an operating mode. + """ + return ( + config.logs_input.type, + config.package.storage_engine.value, + config.package.query_engine.value, + config.mcp_server is not None, + config.presto is not None, + config.archive_output.storage.type, + config.stream_output.storage.type, + config.aws_config_directory is not None, + config.get_deployment_type(), + ) + + def get_clp_config_from_mode(mode_name: str) -> ClpConfig: """ Return a ClpConfig object for the given mode name. diff --git a/integration-tests/tests/utils/config.py b/integration-tests/tests/utils/config.py index 3b2040b2a8..09088dde22 100644 --- a/integration-tests/tests/utils/config.py +++ b/integration-tests/tests/utils/config.py @@ -2,11 +2,16 @@ from __future__ import annotations +import re from dataclasses import dataclass, field, InitVar from pathlib import Path -from typing import TYPE_CHECKING import yaml +from clp_py_utils.clp_config import ( + CLP_DEFAULT_LOG_DIRECTORY_PATH, + CLP_SHARED_CONFIG_FILENAME, + ClpConfig, +) from tests.utils.utils import ( unlink, @@ -14,9 +19,6 @@ validate_file_exists, ) -if TYPE_CHECKING: - from clp_py_utils.clp_config import ClpConfig - @dataclass(frozen=True) class ClpCorePathConfig: @@ -74,6 +76,9 @@ class PackagePathConfig: #: Directory to store temporary package config files. temp_config_dir: Path = field(init=False, repr=True) + #: Directory where the CLP package writes logs. + clp_log_dir: Path = field(init=False, repr=True) + def __post_init__(self, test_root_dir: Path) -> None: """Validates init values and initializes attributes.""" # Validate that the CLP package directory exists and contains required directories. @@ -93,8 +98,16 @@ def __post_init__(self, test_root_dir: Path) -> None: validate_dir_exists(test_root_dir) object.__setattr__(self, "temp_config_dir", test_root_dir / "temp_config_files") + # Initialize log directory for the package. + object.__setattr__( + self, + "clp_log_dir", + clp_package_dir / CLP_DEFAULT_LOG_DIRECTORY_PATH, + ) + # Create directories if they do not already exist. self.temp_config_dir.mkdir(parents=True, exist_ok=True) + self.clp_log_dir.mkdir(parents=True, exist_ok=True) @property def start_script_path(self) -> Path: @@ -106,6 +119,33 @@ def stop_script_path(self) -> Path: """:return: The absolute path to the package stop script.""" return self.clp_package_dir / "sbin" / "stop-clp.sh" + @property + def compress_script_path(self) -> Path: + """:return: The absolute path to the package compress script.""" + return self.clp_package_dir / "sbin" / "compress.sh" + + +@dataclass(frozen=True) +class PackageCompressJob: + """A compression job for a package test.""" + + job_name: str + log_fixture_name: str + mode: str + log_format: str + unstructured: bool + dataset_name: str | None = None + timestamp_key: str | None = None + tags: list[str] | None = None + subpath: Path | None = None + + +@dataclass(frozen=True) +class PackageJobList: + """List of jobs to run during a package test.""" + + package_compress_jobs: list[PackageCompressJob] + @dataclass(frozen=True) class PackageConfig: @@ -123,6 +163,9 @@ class PackageConfig: #: The Pydantic representation of a CLP package configuration. clp_config: ClpConfig + #: The list of jobs that this package will run during the test. + package_job_list: PackageJobList | None + def __post_init__(self) -> None: """Write the temporary config file for this package.""" self._write_temp_config_file() @@ -151,11 +194,54 @@ class PackageInstance: #: The configuration for this package instance. package_config: PackageConfig + #: The instance ID of the running package. + clp_instance_id: str = field(init=False, repr=True) + + #: The path to the .clp-config.yaml file constructed by the package during spin up. + shared_config_file_path: Path = field(init=False, repr=True) + def __post_init__(self) -> None: """Validates init values and initializes attributes.""" + path_config = self.package_config.path_config + # Validate that the temp config file exists. validate_file_exists(self.package_config.temp_config_file_path) + # Set clp_instance_id from instance-id file. + clp_instance_id_file_path = path_config.clp_log_dir / "instance-id" + validate_file_exists(clp_instance_id_file_path) + clp_instance_id = self._get_clp_instance_id(clp_instance_id_file_path) + object.__setattr__(self, "clp_instance_id", clp_instance_id) + + # Set shared_config_file_path and validate it exists. + shared_config_file_path = path_config.clp_log_dir / CLP_SHARED_CONFIG_FILENAME + validate_file_exists(shared_config_file_path) + object.__setattr__(self, "shared_config_file_path", shared_config_file_path) + + @staticmethod + def _get_clp_instance_id(clp_instance_id_file_path: Path) -> str: + """ + Reads the CLP instance ID from the given file and validates its format. + + :param clp_instance_id_file_path: + :return: The 4-character hexadecimal instance ID. + :raise ValueError: If the file cannot be read or contents are not a 4-character hex string. + """ + try: + contents = clp_instance_id_file_path.read_text(encoding="utf-8").strip() + except OSError as err: + err_msg = f"Cannot read instance-id file '{clp_instance_id_file_path}'" + raise ValueError(err_msg) from err + + if not re.fullmatch(r"[0-9a-fA-F]{4}", contents): + err_msg = ( + f"Invalid instance ID in {clp_instance_id_file_path}: expected a 4-character" + f" hexadecimal string, but read {contents}." + ) + raise ValueError(err_msg) + + return contents + @dataclass(frozen=True) class IntegrationTestPathConfig: diff --git a/integration-tests/tests/utils/docker_utils.py b/integration-tests/tests/utils/docker_utils.py new file mode 100644 index 0000000000..ebab76e46c --- /dev/null +++ b/integration-tests/tests/utils/docker_utils.py @@ -0,0 +1,61 @@ +"""Provide utility functions related to the use of Docker during integration tests.""" + +import re +import shutil +import subprocess +from enum import Enum + + +class DockerStatus(str, Enum): + """Enum of possible Docker Statuses.""" + + created = "created" + restarting = "restarting" + running = "running" + removing = "removing" + paused = "paused" + exited = "exited" + dead = "dead" + + +def get_docker_binary_path() -> str: + """ + Finds the absolute path to the Docker client in the current PATH. + + :return: Absolute path to the Docker binary. + :raise RuntimeError: docker is not found on PATH. + """ + docker_bin = shutil.which("docker") + if docker_bin is None: + err_msg = "docker not found in PATH" + raise RuntimeError(err_msg) + return docker_bin + + +def list_running_containers_with_prefix(prefix: str) -> list[str]: + """ + Lists running Docker containers whose names begin with `prefix` and end with one or more digits. + + :param prefix: + :return: List of running container names that match the pattern. + """ + docker_bin = get_docker_binary_path() + + # fmt: off + docker_ps_cmd = [ + docker_bin, + "ps", + "--format", "{{.Names}}", + "--filter", f"name={prefix}", + "--filter", f"status={DockerStatus.running}", + ] + # fmt: on + ps_proc = subprocess.run(docker_ps_cmd, stdout=subprocess.PIPE, text=True, check=True) + + matches: list[str] = [] + for line in (ps_proc.stdout or "").splitlines(): + name_candidate = line.strip() + if re.fullmatch(re.escape(prefix) + r"\d+", name_candidate): + matches.append(name_candidate) + + return matches diff --git a/integration-tests/tests/utils/package_utils.py b/integration-tests/tests/utils/package_utils.py index 7d714f707a..51de1b5c82 100644 --- a/integration-tests/tests/utils/package_utils.py +++ b/integration-tests/tests/utils/package_utils.py @@ -1,7 +1,14 @@ """Provides utility functions related to the CLP package used across `integration-tests`.""" +import pytest + from tests.utils.asserting_utils import run_and_assert -from tests.utils.config import PackageConfig +from tests.utils.config import ( + IntegrationTestLogs, + PackageCompressJob, + PackageConfig, + PackageInstance, +) DEFAULT_CMD_TIMEOUT_SECONDS = 120.0 @@ -44,3 +51,63 @@ def stop_clp_package(package_config: PackageConfig) -> None: ] # fmt: on run_and_assert(stop_cmd, timeout=DEFAULT_CMD_TIMEOUT_SECONDS) + + +def compress_with_clp_package( + request: pytest.FixtureRequest, + compress_job: PackageCompressJob, + package_instance: PackageInstance, +) -> None: + """ + Construct and run a compression command for the CLP package. + + :param request: + :param compress_job: + :param package_instance: + """ + package_config = package_instance.package_config + compress_script_path = package_config.path_config.compress_script_path + temp_config_file_path = package_config.temp_config_file_path + # Get the correct logs fixture for this job and set up path config objects. + integration_test_logs: IntegrationTestLogs = request.getfixturevalue( + compress_job.log_fixture_name + ) + + # Construct the compression command for this job. + compress_cmd = [ + str(compress_script_path), + "--config", + str(temp_config_file_path), + ] + if compress_job.dataset_name is not None: + compress_cmd.extend( + [ + "--dataset", + compress_job.dataset_name, + ] + ) + if compress_job.timestamp_key is not None: + compress_cmd.extend( + [ + "--timestamp-key", + compress_job.timestamp_key, + ] + ) + if compress_job.unstructured: + compress_cmd.append("--unstructured") + if compress_job.tags is not None: + compress_cmd.extend( + [ + "-t", + ",".join(compress_job.tags), + ] + ) + if compress_job.subpath is not None: + compress_cmd.append(str(integration_test_logs.extraction_dir / compress_job.subpath)) + else: + compress_cmd.append(str(integration_test_logs.extraction_dir)) + + # Run compression command for this job. + run_and_assert(compress_cmd) + + # TODO: Assert that the compression job was successful with package decompression. diff --git a/integration-tests/tests/utils/port_utils.py b/integration-tests/tests/utils/port_utils.py new file mode 100644 index 0000000000..28c5f3c482 --- /dev/null +++ b/integration-tests/tests/utils/port_utils.py @@ -0,0 +1,134 @@ +"""Functions for facilitating the port connections for the CLP package.""" + +import socket +from typing import Any + +from clp_py_utils.clp_config import ClpConfig + +# Port constants. +MIN_NON_PRIVILEGED_PORT = 1024 +MAX_PORT = 65535 +VALID_PORT_RANGE = range(MIN_NON_PRIVILEGED_PORT, MAX_PORT + 1) + +# CLP constants. +REDUCER_MAX_PORTS = 128 +PORT_LIKE_ATTR_NAMES = [ + "port", + "base_port", +] + + +def assign_ports_from_base(base_port: int, clp_config: ClpConfig) -> None: + """ + Assign ports for all active components described in `clp_config` that require a port. Ports are + assigned relative to `base_port`. Assume that port attributes appear directly under each + component with no further nesting, and that each component that requires a port defines only one + port attribute using one of the names in `PORT_LIKE_ATTR_NAMES`. + + :param base_port: + :param clp_config: + :raise ValueError: if the base port is out of range or if any required port is in use. + """ + # Discover which components in ClpConfig have a port attribute. + component_port_targets: list[tuple[Any, str, int]] = [] + for attr_name, attr_value in vars(clp_config).items(): + if attr_name.startswith("_") or attr_value is None: + continue + + port_attr_name: str | None = None + for port_like_name in PORT_LIKE_ATTR_NAMES: + if hasattr(attr_value, port_like_name): + port_attr_name = port_like_name + break + + if port_attr_name is None: + continue + + ports_required_for_component = 1 + if attr_name == "reducer": + # The reducer cluster needs a block of ports starting at its `base_port`. + ports_required_for_component = REDUCER_MAX_PORTS + + component_port_targets.append((attr_value, port_attr_name, ports_required_for_component)) + + # Ensure desired port range is valid and that all ports in the range are available. + total_ports_required = sum( + ports_required_for_component + for _, _, ports_required_for_component in component_port_targets + ) + desired_port_range = range(base_port, base_port + total_ports_required) + _validate_port_range(port_range=desired_port_range) + _validate_ports_available_in_range(host="127.0.0.1", port_range=desired_port_range) + + # Assign ports to the components. + current_port = base_port + for attr_value, port_attr_name, ports_required_for_component in component_port_targets: + setattr(attr_value, port_attr_name, current_port) + current_port += ports_required_for_component + + +def _format_inclusive_port_range(port_range: range) -> str: + """ + Return a prettified string describing an inclusive range. + + :param port_range: + :return: range description of the form "'start' to "'end' inclusive". + """ + start_port = port_range.start + end_port = port_range.stop - 1 + return f"'{start_port}' to '{end_port}' inclusive" + + +def _is_port_free(port: int, host: str) -> bool: + """ + Check whether a TCP port is available for binding. + + :param port: + :param host: + :return: True if the port can be bound, otherwise False. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + try: + sock.bind((host, port)) + except OSError: + return False + return True + + +def _validate_ports_available_in_range(host: str, port_range: range) -> None: + """ + Validate that each port in `port_range` is available on `host`. + + :param host: + :param port_range: + :raise ValueError: if any port in the range cannot be bound. + """ + for port in port_range: + if not _is_port_free(port=port, host=host): + desired_range_str = _format_inclusive_port_range(port_range) + err_msg = ( + f"Port '{port}' in the desired range ({desired_range_str}) is already in use." + " Choose a different port range for the test environment." + ) + raise ValueError(err_msg) + + +def _validate_port_range(port_range: range) -> None: + """ + Validate that `port_range` falls completely within `VALID_PORT_RANGE`. + + :param port_range: + :raise ValueError: if any part of `port_range` falls outside `VALID_PORT_RANGE`. + """ + required_start_port = port_range.start + required_end_port = port_range.stop - 1 + min_valid_port = VALID_PORT_RANGE.start + max_valid_port = VALID_PORT_RANGE.stop - 1 + if required_start_port < min_valid_port or required_end_port > max_valid_port: + required_range_str = _format_inclusive_port_range(port_range) + valid_range_str = _format_inclusive_port_range(VALID_PORT_RANGE) + err_msg = ( + f"The port range derived from --clp-base-port ({required_range_str}) must fall within" + f" the range of valid ports ({valid_range_str})." + ) + raise ValueError(err_msg) diff --git a/integration-tests/tests/utils/utils.py b/integration-tests/tests/utils/utils.py index 54bbaf9148..ba3a85f9eb 100644 --- a/integration-tests/tests/utils/utils.py +++ b/integration-tests/tests/utils/utils.py @@ -5,7 +5,9 @@ import subprocess from pathlib import Path from tempfile import NamedTemporaryFile -from typing import IO +from typing import Any, IO + +import yaml def get_env_var(var_name: str) -> str: @@ -51,6 +53,33 @@ def is_json_file_structurally_equal(json_fp1: Path, json_fp2: Path) -> bool: return is_dir_tree_content_equal(Path(temp_file_1.name), Path(temp_file_2.name)) +def load_yaml_to_dict(path: Path) -> dict[str, Any]: + """ + Parses a UTF-8 YAML file into a dictionary. + + :param path: + :return: Dictionary parsed from the file. + :raise ValueError: if the file contains invalid YAML. + :raise ValueError: if the file cannot be read. + :raise TypeError: if the file does not have a top-level mapping. + """ + try: + with path.open("r", encoding="utf-8") as file: + target_dict = yaml.safe_load(file) + except yaml.YAMLError as err: + err_msg = f"Invalid YAML in target file '{path}'" + raise ValueError(err_msg) from err + except OSError as err: + err_msg = f"Cannot read target file '{path}'" + raise ValueError(err_msg) from err + + if not isinstance(target_dict, dict): + err_msg = f"Target file {path} must have a top-level mapping." + raise TypeError(err_msg) + + return target_dict + + def resolve_path_env_var(var_name: str) -> Path: """ :param var_name: Name of the environment variable holding a path.