diff --git a/integration-tests/.pytest.ini b/integration-tests/.pytest.ini index 6235100f27..5e1e751f1a 100644 --- a/integration-tests/.pytest.ini +++ b/integration-tests/.pytest.ini @@ -3,6 +3,7 @@ addopts = --capture=no --code-highlight=yes --color=yes + -rA --strict-config --strict-markers --verbose @@ -18,3 +19,6 @@ markers = clp: mark tests that use the CLP storage engine clp_s: mark tests that use the CLP-S storage engine core: mark tests that test the CLP core binaries + package: mark tests that run when the CLP package is active + +BASE_PORT = 55000 diff --git a/integration-tests/pyproject.toml b/integration-tests/pyproject.toml index 44f079fdd4..9b9f1313c9 100644 --- a/integration-tests/pyproject.toml +++ b/integration-tests/pyproject.toml @@ -27,6 +27,8 @@ dev = [ "ruff>=0.11.12", "pytest>=8.4.1", "pytest-env>=1.1.5", + "PyYAML>=6.0", + "types-PyYAML>=6.0.12.20240808", ] [tool.mypy] diff --git a/integration-tests/tests/conftest.py b/integration-tests/tests/conftest.py index e3b07cd0de..c2002da4ab 100644 --- a/integration-tests/tests/conftest.py +++ b/integration-tests/tests/conftest.py @@ -1,6 +1,35 @@ -"""Make the fixtures defined in `tests/fixtures/` globally available without imports.""" +"""Global pytest setup.""" +import pytest + +# Make the fixtures defined in `tests/fixtures/` globally available without imports. pytest_plugins = [ "tests.fixtures.integration_test_logs", "tests.fixtures.path_configs", + "tests.fixtures.package_instance", + "tests.fixtures.package_config", ] + + +def pytest_addoption(parser: pytest.Parser) -> None: + """ + Adds options for pytest. + + :param parser: + """ + parser.addini( + "BASE_PORT", + "Base port for CLP package integration tests.", + default="55000", + ) + parser.addoption( + "--job-name", + dest="JOB_NAME", + help="Filter CLP jobs by substring of their job_name.", + ) + parser.addoption( + "--no-jobs", + action="store_true", + dest="NO_JOBS", + help=("Only validate CLP package start and stop. Do not create or run any test jobs."), + ) diff --git a/integration-tests/tests/fixtures/integration_test_logs.py b/integration-tests/tests/fixtures/integration_test_logs.py index 0a876f72e5..0f10571440 100644 --- a/integration-tests/tests/fixtures/integration_test_logs.py +++ b/integration-tests/tests/fixtures/integration_test_logs.py @@ -43,6 +43,20 @@ def postgresql( ) +@pytest.fixture(scope="session") +def spark_event_logs( + request: pytest.FixtureRequest, + integration_test_path_config: IntegrationTestPathConfig, +) -> IntegrationTestLogs: + """Provides shared `spark_event_logs` test logs.""" + return _download_and_extract_dataset( + request=request, + integration_test_path_config=integration_test_path_config, + name="spark-event-logs", + tarball_url="https://zenodo.org/records/10516346/files/spark-event-logs.tar.gz?download=1", + ) + + def _download_and_extract_dataset( request: pytest.FixtureRequest, integration_test_path_config: IntegrationTestPathConfig, diff --git a/integration-tests/tests/fixtures/package_config.py b/integration-tests/tests/fixtures/package_config.py new file mode 100644 index 0000000000..78a8c0a5cb --- /dev/null +++ b/integration-tests/tests/fixtures/package_config.py @@ -0,0 +1,88 @@ +"""Fixtures that create and remove temporary config files for CLP packages.""" + +import contextlib +import logging +from collections.abc import Iterator + +import pytest +from clp_py_utils.clp_config import ( + CLP_DEFAULT_DATA_DIRECTORY_PATH, + CLP_DEFAULT_TMP_DIRECTORY_PATH, +) + +from tests.utils.clp_job_utils import ( + build_package_job_list, +) +from tests.utils.clp_mode_utils import ( + get_clp_config_from_mode, + get_required_component_list, +) +from tests.utils.config import ( + PackageConfig, + PackagePathConfig, +) +from tests.utils.port_utils import assign_ports_from_base +from tests.utils.utils import unlink + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def fixt_package_config( + fixt_package_path_config: PackagePathConfig, + request: pytest.FixtureRequest, +) -> Iterator[PackageConfig]: + """ + Creates and maintains a PackageConfig object for a specific CLP mode. + + :param request: + :return: An iterator that yields the PackageConfig object for the specified mode. + """ + mode_name: str = request.param + logger.debug("Creating a temporary config file for the %s package.", mode_name) + + # Get the ClpConfig for this mode. + clp_config_obj = get_clp_config_from_mode(mode_name) + + # Assign ports based on BASE_PORT from ini. + base_port_string = request.config.getini("BASE_PORT") + try: + base_port = int(base_port_string) + except ValueError as err: + err_msg = ( + f"Invalid BASE_PORT value '{base_port_string}' in pytest.ini; expected an integer." + ) + raise ValueError(err_msg) from err + + assign_ports_from_base(base_port, clp_config_obj) + + # Compute the list of required components for this mode. + required_components = get_required_component_list(clp_config_obj) + + # Build the job list for this mode and the current job filter. + no_jobs: bool = bool(request.config.option.NO_JOBS) + job_filter: str = request.config.option.JOB_NAME or "" + package_job_list = None if no_jobs else build_package_job_list(mode_name, job_filter) + + # Construct PackageConfig. + package_config = PackageConfig( + path_config=fixt_package_path_config, + mode_name=mode_name, + component_list=required_components, + clp_config=clp_config_obj, + package_job_list=package_job_list, + ) + + try: + yield package_config + finally: + logger.debug("Removing the temporary config file and var contents.") + + with contextlib.suppress(FileNotFoundError): + package_config.temp_config_file_path.unlink() + + # Clear data, tmp, and log from the package directory. + data_dir = package_config.path_config.clp_package_dir / CLP_DEFAULT_DATA_DIRECTORY_PATH + tmp_dir = package_config.path_config.clp_package_dir / CLP_DEFAULT_TMP_DIRECTORY_PATH + for directory_path in (data_dir, tmp_dir): + unlink(directory_path) diff --git a/integration-tests/tests/fixtures/package_instance.py b/integration-tests/tests/fixtures/package_instance.py new file mode 100644 index 0000000000..a4b638420d --- /dev/null +++ b/integration-tests/tests/fixtures/package_instance.py @@ -0,0 +1,60 @@ +"""Fixtures that start and stop CLP package instances for integration tests.""" + +import logging +import subprocess +from collections.abc import Iterator + +import pytest + +from tests.utils.config import ( + PackageConfig, + PackageInstance, +) +from tests.utils.package_utils import ( + start_clp_package, + stop_clp_package, +) + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def fixt_package_instance( + fixt_package_config: PackageConfig, + request: pytest.FixtureRequest, +) -> Iterator[PackageInstance]: + """ + Starts a CLP package instance for the given configuration and stops it during teardown. + + :param fixt_package_config: + :param request: + :return: Iterator that yields the running package instance. + """ + mode_name = fixt_package_config.mode_name + no_jobs: bool = bool(request.config.option.NO_JOBS) + instance: PackageInstance | None = None + package_job_list = fixt_package_config.package_job_list + + # Do not start this mode if there are no jobs and the '--no-jobs' flag wasn't specified by user. + if package_job_list is None and not no_jobs: + pytest.skip(f"No jobs to run for mode {mode_name} with current job filter.") + + try: + logger.debug("Starting up the %s package.", mode_name) + start_clp_package(fixt_package_config) + instance = PackageInstance(package_config=fixt_package_config) + yield instance + except RuntimeError: + base_port_string = request.config.getini("BASE_PORT") + pytest.fail( + f"Failed to start the {mode_name} package. This could mean that one of the ports" + f" derived from BASE_PORT={base_port_string} was unavailable. Try changing BASE_PORT in" + " .pytest.ini." + ) + finally: + logger.debug("Now stopping the %s package...", mode_name) + if instance is not None: + stop_clp_package(instance) + else: + # This means setup failed after start; fall back to calling stop script directly + subprocess.run([str(fixt_package_config.path_config.stop_script_path)], check=False) diff --git a/integration-tests/tests/fixtures/path_configs.py b/integration-tests/tests/fixtures/path_configs.py index 023da981a1..45fd1cc8d7 100644 --- a/integration-tests/tests/fixtures/path_configs.py +++ b/integration-tests/tests/fixtures/path_configs.py @@ -25,6 +25,11 @@ def integration_test_path_config() -> IntegrationTestPathConfig: @pytest.fixture(scope="session") -def package_path_config() -> PackagePathConfig: - """Provides paths for the clp-package directory and its contents.""" - return PackagePathConfig(clp_package_dir=resolve_path_env_var("CLP_PACKAGE_DIR")) +def fixt_package_path_config( + integration_test_path_config: IntegrationTestPathConfig, +) -> PackagePathConfig: + """Fixture that provides a PackagePathConfig shared across tests.""" + return PackagePathConfig( + clp_package_dir=resolve_path_env_var("CLP_PACKAGE_DIR"), + test_root_dir=integration_test_path_config.test_root_dir, + ) diff --git a/integration-tests/tests/test_package_start.py b/integration-tests/tests/test_package_start.py new file mode 100644 index 0000000000..ec3aa11ff6 --- /dev/null +++ b/integration-tests/tests/test_package_start.py @@ -0,0 +1,59 @@ +"""Integration tests verifying that the CLP package can be started and stopped.""" + +import logging + +import pytest + +from tests.utils.asserting_utils import ( + validate_package_running, + validate_running_mode_correct, +) +from tests.utils.clp_job_utils import ( + dispatch_test_jobs, +) +from tests.utils.clp_mode_utils import CLP_MODE_CONFIGS +from tests.utils.config import ( + PackageInstance, +) + +TEST_MODES = CLP_MODE_CONFIGS.keys() + +logger = logging.getLogger(__name__) + + +@pytest.mark.package +@pytest.mark.parametrize("fixt_package_config", TEST_MODES, indirect=True) +def test_clp_package( + request: pytest.FixtureRequest, fixt_package_instance: PackageInstance +) -> None: + """ + Validate that all of the components of the CLP package start up successfully for the selected + mode of operation. + + :param fixt_package_instance: + """ + mode_name = fixt_package_instance.package_config.mode_name + instance_id = fixt_package_instance.clp_instance_id + + # Ensure that all package components are running. + logger.debug( + "Checking if all components of %s package with instance ID '%s' are running properly.", + mode_name, + instance_id, + ) + + validate_package_running(fixt_package_instance) + + # Ensure that the package is running in the correct mode. + logger.debug( + "Checking that the %s package with instance ID '%s' is running in the correct mode.", + mode_name, + instance_id, + ) + + validate_running_mode_correct(fixt_package_instance) + + # Run all jobs. + package_job_list = fixt_package_instance.package_config.package_job_list + if package_job_list is not None: + dispatch_test_jobs(request, fixt_package_instance) diff --git a/integration-tests/tests/utils/asserting_utils.py b/integration-tests/tests/utils/asserting_utils.py index fe39ec4576..acd8947de9 100644 --- a/integration-tests/tests/utils/asserting_utils.py +++ b/integration-tests/tests/utils/asserting_utils.py @@ -4,6 +4,15 @@ from typing import Any import pytest +from clp_py_utils.clp_config import ClpConfig +from pydantic import ValidationError + +from tests.utils.clp_mode_utils import ( + compute_mode_signature, +) +from tests.utils.config import PackageInstance +from tests.utils.docker_utils import list_running_containers_with_prefix +from tests.utils.utils import load_yaml_to_dict def run_and_assert(cmd: list[str], **kwargs: Any) -> subprocess.CompletedProcess[Any]: @@ -20,3 +29,47 @@ def run_and_assert(cmd: list[str], **kwargs: Any) -> subprocess.CompletedProcess except subprocess.CalledProcessError as e: pytest.fail(f"Command failed: {' '.join(cmd)}: {e}") return proc + + +def validate_package_running(package_instance: PackageInstance) -> None: + """ + Validate that the given package instance is running. Each required component must have at least + one running container whose name matches the expected prefix. Calls pytest.fail on the first + missing component. + + :param package_instance: + """ + instance_id = package_instance.clp_instance_id + required_components = package_instance.package_config.component_list + + for component in required_components: + prefix = f"clp-package-{instance_id}-{component}-" + running_matches = list_running_containers_with_prefix(prefix) + if len(running_matches) == 0: + pytest.fail( + f"No running container found for component '{component}' " + f"(expected name prefix '{prefix}')." + ) + + +def validate_running_mode_correct(package_instance: PackageInstance) -> None: + """ + Validate that the mode described in the shared config of the instance matches the intended mode + defined by the instance configuration. Calls pytest.fail if the shared config fails validation + or if the running mode does not match the intended mode. + + :param package_instance: + """ + shared_config_dict = load_yaml_to_dict(package_instance.shared_config_file_path) + try: + running_config = ClpConfig.model_validate(shared_config_dict) + except ValidationError as err: + pytest.fail(f"Shared config failed validation: {err}") + + intended_config = package_instance.package_config.clp_config + + running_signature = compute_mode_signature(running_config) + intended_signature = compute_mode_signature(intended_config) + + if running_signature != intended_signature: + pytest.fail("Mode mismatch: running configuration does not match intended configuration.") diff --git a/integration-tests/tests/utils/clp_job_utils.py b/integration-tests/tests/utils/clp_job_utils.py new file mode 100644 index 0000000000..55f408171a --- /dev/null +++ b/integration-tests/tests/utils/clp_job_utils.py @@ -0,0 +1,195 @@ +"""Provides utilities related to the test jobs for the CLP package.""" + +import logging +from pathlib import Path + +import pytest +from clp_py_utils.clp_config import ( + CLP_DEFAULT_DATA_DIRECTORY_PATH, + CLP_DEFAULT_TMP_DIRECTORY_PATH, +) + +from tests.utils.config import ( + PackageCompressJob, + PackageInstance, + PackageJobList, + PackageSearchJob, +) +from tests.utils.package_utils import compress_with_clp_package, search_with_clp_package +from tests.utils.utils import unlink + +logger = logging.getLogger(__name__) + + +PACKAGE_COMPRESS_JOBS: dict[str, PackageCompressJob] = { + "compress-postgresql": PackageCompressJob( + job_name="compress-postgresql", + log_fixture_name="postgresql", + mode="clp-json", + log_format="json", + unstructured=False, + dataset_name="postgresql", + timestamp_key="timestamp", + ), + "compress-spark-event-logs": PackageCompressJob( + job_name="compress-spark-event-logs", + log_fixture_name="spark_event_logs", + mode="clp-json", + log_format="json", + unstructured=False, + dataset_name="spark-event-logs", + timestamp_key="Timestamp", + ), + "compress-default-dataset": PackageCompressJob( + job_name="compress-default-dataset", + log_fixture_name="postgresql", + mode="clp-json", + log_format="json", + unstructured=False, + timestamp_key="timestamp", + ), + "compress-tagged-data-spark": PackageCompressJob( + job_name="compress-tagged-data-spark", + log_fixture_name="spark_event_logs", + mode="clp-json", + log_format="json", + unstructured=False, + dataset_name="tagged_data", + timestamp_key="Timestamp", + subpath=Path("spark-event-logs") / "app-20211007095008-0000", + tags=[ + "tag1", + ], + ), + "compress-hive-24hr": PackageCompressJob( + job_name="compress-hive-24hr", + log_fixture_name="hive_24hr", + mode="clp-text", + log_format="text", + unstructured=True, + ), + "compress-tagged-data-hive": PackageCompressJob( + job_name="compress-tagged-data-hive", + log_fixture_name="hive_24hr", + mode="clp-text", + log_format="text", + unstructured=True, + subpath=( + Path("hive-24hr") + / "i-0ac90a05" + / "application_1427088391284_0001" + / "container_1427088391284_0001_01_000124" + / "syslog" + ), + tags=[ + "tag1", + ], + ), + # Insert more compression jobs here as needed. +} + +PACKAGE_SEARCH_JOBS: dict[str, PackageSearchJob] = { + "search-basic-postgresql": PackageSearchJob( + job_name="search-basic-postgresql", + mode="clp-json", + package_compress_job=PACKAGE_COMPRESS_JOBS["compress-postgresql"], + ignore_case=False, + count=False, + wildcard_query='message: "next transaction ID: 735; next OID: 16388"', + desired_result=( + '{"timestamp":"2023-03-27 00:26:35.873","pid":7813,' + '"session_id":"64211afb.1e85",' + '"line_num":4,"session_start":"2023-03-27 00:26:35 EDT","txid":0,' + '"error_severity":"DEBUG","message":"next transaction ID: 735; ' + 'next OID: 16388",' + '"backend_type":"startup","query_id":0}\n' + ), + ), + "search-basic-hive": PackageSearchJob( + job_name="search-basic-hive", + mode="clp-text", + package_compress_job=PACKAGE_COMPRESS_JOBS["compress-hive-24hr"], + ignore_case=False, + count=False, + wildcard_query="Shuffle@79ec394e", + desired_result=( + "2015-03-23 05:40:08,988 INFO [main] " + "org.apache.hadoop.mapred.ReduceTask: " + "Using ShuffleConsumerPlugin: " + "org.apache.hadoop.mapreduce.task.reduce.Shuffle@79ec394e\n" + ), + ), + # Insert more search jobs here as needed. +} + + +def _matches_keyword(job_name: str, keyword_filter: str) -> bool: + """Return True if this job should be included given the current -k filter.""" + if not keyword_filter: + return True + return keyword_filter.lower() in job_name.lower() + + +def build_package_job_list(mode_name: str, job_filter: str) -> PackageJobList | None: + """ + Builds the list of package jobs for this test run. + + :param mode_name: + :param job_filter: + :return: PackageJobList if there are jobs for this mode, None if not. + """ + logger.debug("Creating job list for mode %s (job filter: %s)", mode_name, job_filter) + + package_compress_jobs: list[PackageCompressJob] = [] + package_search_jobs: list[PackageSearchJob] = [] + + for job_name, package_compress_job in PACKAGE_COMPRESS_JOBS.items(): + if package_compress_job.mode == mode_name and _matches_keyword(job_name, job_filter): + package_compress_jobs.append(package_compress_job) + + for job_name, package_search_job in PACKAGE_SEARCH_JOBS.items(): + if package_search_job.mode == mode_name and _matches_keyword(job_name, job_filter): + package_search_jobs.append(package_search_job) + if package_search_job.package_compress_job not in package_compress_jobs: + package_compress_jobs.append(package_search_job.package_compress_job) + + if not package_compress_jobs and not package_search_jobs: + return None + return PackageJobList( + package_compress_jobs=package_compress_jobs, + package_search_jobs=package_search_jobs, + ) + + +def dispatch_test_jobs(request: pytest.FixtureRequest, package_instance: PackageInstance) -> None: + """ + Dispatches all the package jobs in `job_list` for this package test run. Each compression job is + followed immediately by the search jobs that depend on it. + + :param request: + :param package_instance: + """ + package_job_list = package_instance.package_config.package_job_list + if package_job_list is None: + err_msg = "Package job list is not configured for this package instance." + raise RuntimeError(err_msg) + + package_compress_jobs = package_job_list.package_compress_jobs + package_search_jobs = package_job_list.package_search_jobs + clp_package_dir = package_instance.package_config.path_config.clp_package_dir + + # For each compression job, run it, then its dependent search jobs, then cleanup. + for package_compress_job in package_compress_jobs: + # Run the compression job. + compress_with_clp_package(request, package_compress_job, package_instance) + + # Run the search jobs from the list that depend on the compression job. + for package_search_job in package_search_jobs: + if package_search_job.package_compress_job is package_compress_job: + search_with_clp_package(package_search_job, package_instance) + + # Cleanup this compress job to prevent multiple compression jobs stored in archives. + data_dir = clp_package_dir / CLP_DEFAULT_DATA_DIRECTORY_PATH + tmp_dir = clp_package_dir / CLP_DEFAULT_TMP_DIRECTORY_PATH + for directory_path in (data_dir, tmp_dir): + unlink(directory_path) diff --git a/integration-tests/tests/utils/clp_mode_utils.py b/integration-tests/tests/utils/clp_mode_utils.py new file mode 100644 index 0000000000..afc482992e --- /dev/null +++ b/integration-tests/tests/utils/clp_mode_utils.py @@ -0,0 +1,136 @@ +"""Provides utilities related to the user-level configurations of CLP's operating modes.""" + +from collections.abc import Callable +from typing import Any + +from clp_py_utils.clp_config import ( + API_SERVER_COMPONENT_NAME, + ClpConfig, + COMPRESSION_SCHEDULER_COMPONENT_NAME, + COMPRESSION_WORKER_COMPONENT_NAME, + DB_COMPONENT_NAME, + DeploymentType, + GARBAGE_COLLECTOR_COMPONENT_NAME, + MCP_SERVER_COMPONENT_NAME, + Package, + QUERY_SCHEDULER_COMPONENT_NAME, + QUERY_WORKER_COMPONENT_NAME, + QueryEngine, + QUEUE_COMPONENT_NAME, + REDIS_COMPONENT_NAME, + REDUCER_COMPONENT_NAME, + RESULTS_CACHE_COMPONENT_NAME, + StorageEngine, + WEBUI_COMPONENT_NAME, +) + +CLP_MODE_CONFIGS: dict[str, Callable[[], ClpConfig]] = { + "clp-text": lambda: ClpConfig( + package=Package( + storage_engine=StorageEngine.CLP, + query_engine=QueryEngine.CLP, + ), + ), + "clp-json": lambda: ClpConfig( + package=Package( + storage_engine=StorageEngine.CLP_S, + query_engine=QueryEngine.CLP_S, + ), + ), +} + + +# TODO: This will eventually be replaced by a formalized mapping between component and service. +def _to_docker_compose_service_name(name: str) -> str: + """ + Convert a component name to a Docker Compose service name. + + :param name: + :return: Service name with underscores replaced by hyphens + """ + return name.replace("_", "-") + + +CLP_BASE_COMPONENTS = [ + _to_docker_compose_service_name(DB_COMPONENT_NAME), + _to_docker_compose_service_name(QUEUE_COMPONENT_NAME), + _to_docker_compose_service_name(REDIS_COMPONENT_NAME), + _to_docker_compose_service_name(REDUCER_COMPONENT_NAME), + _to_docker_compose_service_name(RESULTS_CACHE_COMPONENT_NAME), + _to_docker_compose_service_name(COMPRESSION_SCHEDULER_COMPONENT_NAME), + _to_docker_compose_service_name(COMPRESSION_WORKER_COMPONENT_NAME), + _to_docker_compose_service_name(API_SERVER_COMPONENT_NAME), + _to_docker_compose_service_name(WEBUI_COMPONENT_NAME), +] + +CLP_QUERY_COMPONENTS = [ + _to_docker_compose_service_name(QUERY_SCHEDULER_COMPONENT_NAME), + _to_docker_compose_service_name(QUERY_WORKER_COMPONENT_NAME), +] + +CLP_GARBAGE_COLLECTOR_COMPONENT = _to_docker_compose_service_name(GARBAGE_COLLECTOR_COMPONENT_NAME) + +CLP_MCP_SERVER_COMPONENT = _to_docker_compose_service_name(MCP_SERVER_COMPONENT_NAME) + + +def compute_mode_signature(config: ClpConfig) -> tuple[Any, ...]: + """ + Constructs a signature that captures the mode-defining aspects of a ClpConfig object. + + :param config: + :return: Tuple that encodes fields used to determine an operating mode. + """ + return ( + config.logs_input.type, + config.package.storage_engine.value, + config.package.query_engine.value, + config.mcp_server is not None, + config.presto is not None, + config.archive_output.storage.type, + config.stream_output.storage.type, + config.aws_config_directory is not None, + config.get_deployment_type(), + ) + + +def get_clp_config_from_mode(mode_name: str) -> ClpConfig: + """ + Return a ClpConfig object for the given mode name. + + :param mode_name: + :return: ClpConfig object corresponding to the mode. + :raise ValueError: If the mode is not supported. + """ + try: + config = CLP_MODE_CONFIGS[mode_name] + except KeyError as err: + err_msg = f"Unsupported mode: {mode_name}" + raise ValueError(err_msg) from err + return config() + + +def get_required_component_list(config: ClpConfig) -> list[str]: + """ + Constructs a list of the components that the CLP package described in `config` needs to run + properly. + + :param config: + :return: List of components required by the package. + """ + component_list: list[str] = [] + component_list.extend(CLP_BASE_COMPONENTS) + + deployment_type = config.get_deployment_type() + if deployment_type == DeploymentType.FULL: + component_list.extend(CLP_QUERY_COMPONENTS) + + if ( + config.archive_output.retention_period is not None + or config.results_cache.retention_period is not None + ): + component_list.append(CLP_GARBAGE_COLLECTOR_COMPONENT) + + if config.mcp_server is not None: + component_list.append(CLP_MCP_SERVER_COMPONENT) + + return component_list diff --git a/integration-tests/tests/utils/config.py b/integration-tests/tests/utils/config.py index 192e2f892d..a785c5c4ce 100644 --- a/integration-tests/tests/utils/config.py +++ b/integration-tests/tests/utils/config.py @@ -2,12 +2,21 @@ from __future__ import annotations +import re from dataclasses import dataclass, field, InitVar from pathlib import Path +import yaml +from clp_py_utils.clp_config import ( + CLP_DEFAULT_LOG_DIRECTORY_PATH, + CLP_SHARED_CONFIG_FILENAME, + ClpConfig, +) + from tests.utils.utils import ( unlink, validate_dir_exists, + validate_file_exists, ) @@ -61,8 +70,18 @@ class PackagePathConfig: #: Root directory containing all CLP package contents. clp_package_dir: Path - def __post_init__(self) -> None: - """Validates that the CLP package directory exists and contains all required directories.""" + #: Root directory for package tests output. + test_root_dir: InitVar[Path] + + #: Directory to store any cached package config files. + temp_config_dir: Path = field(init=False, repr=True) + + #: Directory where the CLP package writes logs. + clp_log_dir: Path = field(init=False, repr=True) + + def __post_init__(self, test_root_dir: Path) -> None: + """Validates init values and initializes attributes.""" + # Validate that the CLP package directory exists and contains required directories. clp_package_dir = self.clp_package_dir validate_dir_exists(clp_package_dir) @@ -75,6 +94,178 @@ def __post_init__(self) -> None: ) raise RuntimeError(err_msg) + # Initialize cache directory for package tests. + validate_dir_exists(test_root_dir) + object.__setattr__(self, "temp_config_dir", test_root_dir / "temp_config_files") + + # Initialize log directory for the package. + object.__setattr__( + self, + "clp_log_dir", + clp_package_dir / CLP_DEFAULT_LOG_DIRECTORY_PATH, + ) + + # Create directories if they do not already exist. + self.temp_config_dir.mkdir(parents=True, exist_ok=True) + self.clp_log_dir.mkdir(parents=True, exist_ok=True) + + @property + def start_script_path(self) -> Path: + """:return: The absolute path to the package start script.""" + return self.clp_package_dir / "sbin" / "start-clp.sh" + + @property + def stop_script_path(self) -> Path: + """:return: The absolute path to the package stop script.""" + return self.clp_package_dir / "sbin" / "stop-clp.sh" + + @property + def compress_script_path(self) -> Path: + """:return: The absolute path to the package compress script.""" + return self.clp_package_dir / "sbin" / "compress.sh" + + @property + def search_script_path(self) -> Path: + """:return: The absolute path to the package compress script.""" + return self.clp_package_dir / "sbin" / "search.sh" + + +@dataclass(frozen=True) +class PackageCompressJob: + """A compression job for a package test.""" + + job_name: str + log_fixture_name: str + mode: str + log_format: str + unstructured: bool + dataset_name: str | None = None + timestamp_key: str | None = None + tags: list[str] | None = None + subpath: Path | None = None + + +@dataclass(frozen=True) +class PackageSearchJob: + """A search job for a package test.""" + + job_name: str + mode: str + package_compress_job: PackageCompressJob + ignore_case: bool + count: bool + wildcard_query: str + desired_result: str + begin_time: int | None = None + end_time: int | None = None + file_path: Path | None = None + count_by_time: int | None = None + + +@dataclass(frozen=True) +class PackageJobList: + """List of jobs to run during a package test.""" + + package_compress_jobs: list[PackageCompressJob] + package_search_jobs: list[PackageSearchJob] + # TODO: add job types as needed. + + +@dataclass(frozen=True) +class PackageConfig: + """Metadata for a specific configuration of the CLP package.""" + + #: Path configuration for this package. + path_config: PackagePathConfig + + #: Name of the mode of operation represented in this config. + mode_name: str + + #: The list of CLP components that this package needs. + component_list: list[str] + + #: The ClpConfig instance that describes this package configuration. + clp_config: ClpConfig + + #: The list of jobs that this package will run during the test. + package_job_list: PackageJobList | None + + def __post_init__(self) -> None: + """Write the temporary config file for this package.""" + self._write_temp_config_file() + + @property + def temp_config_file_path(self) -> Path: + """:return: The absolute path to the temporary configuration file for the package.""" + return self.path_config.temp_config_dir / f"clp-config-{self.mode_name}.yaml" + + def _write_temp_config_file(self) -> None: + """Writes the temporary config file for this package.""" + temp_config_file_path = self.temp_config_file_path + + payload = self.clp_config.dump_to_primitive_dict() # type: ignore[no-untyped-call] + + tmp_path = temp_config_file_path.with_suffix(temp_config_file_path.suffix + ".tmp") + with tmp_path.open("w", encoding="utf-8") as f: + yaml.safe_dump(payload, f, sort_keys=False) + tmp_path.replace(temp_config_file_path) + + +@dataclass(frozen=True) +class PackageInstance: + """Metadata for a running instance of the CLP package.""" + + #: Config describing this package instance. + package_config: PackageConfig + + #: The instance ID of the running package. + clp_instance_id: str = field(init=False, repr=True) + + #: The path to the .clp-config.yaml file constructed by the package during spin up. + shared_config_file_path: Path = field(init=False, repr=True) + + def __post_init__(self) -> None: + """Validates init values and initializes attributes.""" + path_config = self.package_config.path_config + + # Validate that the temp config file exists. + validate_file_exists(self.package_config.temp_config_file_path) + + # Set clp_instance_id from instance-id file. + clp_instance_id_file_path = path_config.clp_log_dir / "instance-id" + validate_file_exists(clp_instance_id_file_path) + clp_instance_id = self._get_clp_instance_id(clp_instance_id_file_path) + object.__setattr__(self, "clp_instance_id", clp_instance_id) + + # Set shared_config_file_path and validate it exists. + shared_config_file_path = path_config.clp_log_dir / CLP_SHARED_CONFIG_FILENAME + validate_file_exists(shared_config_file_path) + object.__setattr__(self, "shared_config_file_path", shared_config_file_path) + + @staticmethod + def _get_clp_instance_id(clp_instance_id_file_path: Path) -> str: + """ + Reads the CLP instance ID from the given file and validates its format. + + :param clp_instance_id_file_path: + :return: The 4-character hexadecimal instance ID. + :raise ValueError: If the file cannot be read or contents are not a 4-character hex string. + """ + try: + contents = clp_instance_id_file_path.read_text(encoding="utf-8").strip() + except OSError as err: + err_msg = f"Cannot read instance-id file '{clp_instance_id_file_path}'" + raise ValueError(err_msg) from err + + if not re.fullmatch(r"[0-9a-fA-F]{4}", contents): + err_msg = ( + f"Invalid instance ID in {clp_instance_id_file_path}: expected a 4-character" + f" hexadecimal string, but read {contents}." + ) + raise ValueError(err_msg) + + return contents + @dataclass(frozen=True) class IntegrationTestPathConfig: diff --git a/integration-tests/tests/utils/docker_utils.py b/integration-tests/tests/utils/docker_utils.py new file mode 100644 index 0000000000..ebab76e46c --- /dev/null +++ b/integration-tests/tests/utils/docker_utils.py @@ -0,0 +1,61 @@ +"""Provide utility functions related to the use of Docker during integration tests.""" + +import re +import shutil +import subprocess +from enum import Enum + + +class DockerStatus(str, Enum): + """Enum of possible Docker Statuses.""" + + created = "created" + restarting = "restarting" + running = "running" + removing = "removing" + paused = "paused" + exited = "exited" + dead = "dead" + + +def get_docker_binary_path() -> str: + """ + Finds the absolute path to the Docker client in the current PATH. + + :return: Absolute path to the Docker binary. + :raise RuntimeError: docker is not found on PATH. + """ + docker_bin = shutil.which("docker") + if docker_bin is None: + err_msg = "docker not found in PATH" + raise RuntimeError(err_msg) + return docker_bin + + +def list_running_containers_with_prefix(prefix: str) -> list[str]: + """ + Lists running Docker containers whose names begin with `prefix` and end with one or more digits. + + :param prefix: + :return: List of running container names that match the pattern. + """ + docker_bin = get_docker_binary_path() + + # fmt: off + docker_ps_cmd = [ + docker_bin, + "ps", + "--format", "{{.Names}}", + "--filter", f"name={prefix}", + "--filter", f"status={DockerStatus.running}", + ] + # fmt: on + ps_proc = subprocess.run(docker_ps_cmd, stdout=subprocess.PIPE, text=True, check=True) + + matches: list[str] = [] + for line in (ps_proc.stdout or "").splitlines(): + name_candidate = line.strip() + if re.fullmatch(re.escape(prefix) + r"\d+", name_candidate): + matches.append(name_candidate) + + return matches diff --git a/integration-tests/tests/utils/package_utils.py b/integration-tests/tests/utils/package_utils.py new file mode 100644 index 0000000000..ef996b21cb --- /dev/null +++ b/integration-tests/tests/utils/package_utils.py @@ -0,0 +1,217 @@ +"""Provides utility functions for interacting with the CLP package.""" + +import logging +import subprocess + +import pytest + +from tests.utils.asserting_utils import run_and_assert +from tests.utils.config import ( + IntegrationTestLogs, + PackageCompressJob, + PackageConfig, + PackageInstance, + PackageSearchJob, +) + +logger = logging.getLogger(__name__) + + +def start_clp_package(package_config: PackageConfig) -> None: + """ + Starts an instance of the CLP package. + + :param package_config: + :raise RuntimeError: If the package fails to start. + """ + path_config = package_config.path_config + start_script_path = path_config.start_script_path + temp_config_file_path = package_config.temp_config_file_path + try: + # fmt: off + start_cmd = [ + str(start_script_path), + "--config", str(temp_config_file_path), + ] + # fmt: on + subprocess.run(start_cmd, check=True) + except Exception as err: + err_msg = f"Failed to start an instance of the {package_config.mode_name} package." + raise RuntimeError(err_msg) from err + + +def stop_clp_package(instance: PackageInstance) -> None: + """ + Stops an instance of the CLP package. + + :param instance: + :raise RuntimeError: If the package fails to stop. + """ + package_config = instance.package_config + path_config = package_config.path_config + stop_script_path = path_config.stop_script_path + try: + # fmt: off + stop_cmd = [ + str(stop_script_path) + ] + # fmt: on + subprocess.run(stop_cmd, check=True) + except Exception as err: + err_msg = f"Failed to stop an instance of the {package_config.mode_name} package." + raise RuntimeError(err_msg) from err + + +def compress_with_clp_package( + request: pytest.FixtureRequest, + compress_job: PackageCompressJob, + package_instance: PackageInstance, +) -> None: + """ + Construct and run a compression command for the CLP package. + + :param request: + :param compress_job: + :param package_instance: + """ + package_config = package_instance.package_config + compress_script_path = package_config.path_config.compress_script_path + temp_config_file_path = package_config.temp_config_file_path + # Get the correct logs fixture for this job and set up path config objects. + integration_test_logs: IntegrationTestLogs = request.getfixturevalue( + compress_job.log_fixture_name + ) + + # Construct the compression command for this job. + compress_cmd = [ + str(compress_script_path), + "--config", + str(temp_config_file_path), + ] + + if compress_job.dataset_name is not None: + compress_cmd.extend( + [ + "--dataset", + compress_job.dataset_name, + ] + ) + + if compress_job.timestamp_key is not None: + compress_cmd.extend( + [ + "--timestamp-key", + compress_job.timestamp_key, + ] + ) + + if compress_job.unstructured: + compress_cmd.append("--unstructured") + + if compress_job.tags is not None: + compress_cmd.extend( + [ + "-t", + ",".join(compress_job.tags), + ] + ) + + if compress_job.subpath is not None: + compress_cmd.append(str(integration_test_logs.extraction_dir / compress_job.subpath)) + else: + compress_cmd.append(str(integration_test_logs.extraction_dir)) + + # Run compression command for this job and assert that it succeeds. + run_and_assert(compress_cmd) + + +def search_with_clp_package( + search_job: PackageSearchJob, + package_instance: PackageInstance, +) -> None: + """ + Construct and run a search command for the CLP package. + + :param request: + :param search_job: + :param package_instance: + """ + package_config = package_instance.package_config + search_script_path = package_config.path_config.search_script_path + temp_config_file_path = package_config.temp_config_file_path + + # Construct the search command for this job. + search_cmd = [ + str(search_script_path), + "--config", + str(temp_config_file_path), + ] + if search_job.package_compress_job.dataset_name is not None: + search_cmd.extend( + [ + "--dataset", + search_job.package_compress_job.dataset_name, + ] + ) + if search_job.package_compress_job.tags is not None: + search_cmd.extend( + [ + "-t", + ",".join(search_job.package_compress_job.tags), + ] + ) + if search_job.begin_time is not None: + search_cmd.extend( + [ + "--begin-time", + str(search_job.begin_time), + ] + ) + if search_job.end_time is not None: + search_cmd.extend( + [ + "--end-time", + str(search_job.end_time), + ] + ) + if search_job.ignore_case: + search_cmd.append("--ignore-case") + if search_job.file_path is not None: + search_cmd.extend( + [ + "--file-path", + str(search_job.file_path), + ] + ) + if search_job.count: + search_cmd.append("--count") + if search_job.count_by_time is not None: + search_cmd.extend( + [ + "--count-by-time", + str(search_job.count_by_time), + ] + ) + search_cmd.append("--raw") + search_cmd.append(search_job.wildcard_query) + + # Run search command for this job and capture the output. + completed_process = run_and_assert( + search_cmd, + capture_output=True, + text=True, + ) + + # Compare captured output to `desired_result`. + search_output = completed_process.stdout + expected_output = search_job.desired_result + + if search_output != expected_output: + error_message = ( + f"Search output for job '{search_job.job_name}' did not match desired_result.\n" + "Expected:\n" + f"{expected_output}\n" + "Actual:\n" + f"{search_output}" + ) + pytest.fail(error_message) diff --git a/integration-tests/tests/utils/port_utils.py b/integration-tests/tests/utils/port_utils.py new file mode 100644 index 0000000000..cfd1608533 --- /dev/null +++ b/integration-tests/tests/utils/port_utils.py @@ -0,0 +1,74 @@ +"""Functions for facilitating the port connections for the CLP package.""" + +from clp_py_utils.clp_config import ClpConfig + +MAX_REQUIRED_PORTS = 10 +SYSTEM_PORTS_BOUNDARY = 1023 +MAX_PORT = 65535 + + +def _validate_base_port(base_port: int) -> None: + """ + Validate that `base_port` plus `MAX_REQUIRED_PORTS` stays within [1024, 65535]. + + :param base_port: + :raise ValueError: if the range exceeds the valid TCP port range. + """ + if base_port <= SYSTEM_PORTS_BOUNDARY: + err_msg = ( + f"BASE_PORT number should be larger than {SYSTEM_PORTS_BOUNDARY}, got {base_port}." + ) + raise ValueError(err_msg) + if base_port + MAX_REQUIRED_PORTS - 1 > MAX_PORT: + err_msg = ( + f"BASE_PORT={base_port} leaves insufficient headroom for {MAX_REQUIRED_PORTS} ports." + ) + raise ValueError(err_msg) + + +def assign_ports_from_base(base_port: int, clp_config: ClpConfig) -> None: + """ + Assign ports for all components that require a port in `clp_config`. Ports are assigned + relative to `base_port`. + + :param clp_config: + :param base_port: + """ + # Ensure base_port is valid and that there's enough room to assign all ports. + _validate_base_port(base_port=base_port) + + current_port = base_port + + clp_config.database.port = current_port + current_port += 1 + + clp_config.queue.port = current_port + current_port += 1 + + clp_config.redis.port = current_port + current_port += 1 + + # Reducer uses `base_port` instead of `port` + clp_config.reducer.base_port = current_port + current_port += 1 + + clp_config.results_cache.port = current_port + current_port += 1 + + clp_config.query_scheduler.port = current_port + current_port += 1 + + clp_config.api_server.port = current_port + current_port += 1 + + clp_config.webui.port = current_port + current_port += 1 + + # Optional services + if clp_config.mcp_server is not None: + clp_config.mcp_server.port = current_port + current_port += 1 + + if clp_config.presto is not None: + clp_config.presto.port = current_port + current_port += 1 diff --git a/integration-tests/tests/utils/utils.py b/integration-tests/tests/utils/utils.py index 6d856c5551..ba3a85f9eb 100644 --- a/integration-tests/tests/utils/utils.py +++ b/integration-tests/tests/utils/utils.py @@ -5,7 +5,9 @@ import subprocess from pathlib import Path from tempfile import NamedTemporaryFile -from typing import IO +from typing import Any, IO + +import yaml def get_env_var(var_name: str) -> str: @@ -51,6 +53,33 @@ def is_json_file_structurally_equal(json_fp1: Path, json_fp2: Path) -> bool: return is_dir_tree_content_equal(Path(temp_file_1.name), Path(temp_file_2.name)) +def load_yaml_to_dict(path: Path) -> dict[str, Any]: + """ + Parses a UTF-8 YAML file into a dictionary. + + :param path: + :return: Dictionary parsed from the file. + :raise ValueError: if the file contains invalid YAML. + :raise ValueError: if the file cannot be read. + :raise TypeError: if the file does not have a top-level mapping. + """ + try: + with path.open("r", encoding="utf-8") as file: + target_dict = yaml.safe_load(file) + except yaml.YAMLError as err: + err_msg = f"Invalid YAML in target file '{path}'" + raise ValueError(err_msg) from err + except OSError as err: + err_msg = f"Cannot read target file '{path}'" + raise ValueError(err_msg) from err + + if not isinstance(target_dict, dict): + err_msg = f"Target file {path} must have a top-level mapping." + raise TypeError(err_msg) + + return target_dict + + def resolve_path_env_var(var_name: str) -> Path: """ :param var_name: Name of the environment variable holding a path. @@ -96,6 +125,16 @@ def validate_dir_exists(dir_path: Path) -> None: raise ValueError(err_msg) +def validate_file_exists(file_path: Path) -> None: + """ + :param file_path: + :raise ValueError: if the path does not exist or is not a file. + """ + if not file_path.is_file(): + err_msg = f"Path does not exist or is not a file: {file_path}" + raise ValueError(err_msg) + + def _sort_json_keys_and_rows(json_fp: Path) -> IO[str]: """ Normalize a JSON file to a stable, deterministically ordered form for comparison. diff --git a/integration-tests/uv.lock b/integration-tests/uv.lock index 1f30562b1d..a9dd0f3b5f 100644 --- a/integration-tests/uv.lock +++ b/integration-tests/uv.lock @@ -903,7 +903,9 @@ dev = [ { name = "mypy" }, { name = "pytest" }, { name = "pytest-env" }, + { name = "pyyaml" }, { name = "ruff" }, + { name = "types-pyyaml" }, ] [package.metadata] @@ -919,7 +921,9 @@ dev = [ { name = "mypy", specifier = ">=1.16.0" }, { name = "pytest", specifier = ">=8.4.1" }, { name = "pytest-env", specifier = ">=1.1.5" }, + { name = "pyyaml", specifier = ">=6.0" }, { name = "ruff", specifier = ">=0.11.12" }, + { name = "types-pyyaml", specifier = ">=6.0.12.20240808" }, ] [[package]] @@ -2506,6 +2510,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/77/b8/0135fadc89e73be292b473cb820b4f5a08197779206b33191e801feeae40/tomli-2.3.0-py3-none-any.whl", hash = "sha256:e95b1af3c5b07d9e643909b5abbec77cd9f1217e6d0bca72b0234736b9fb1f1b", size = 14408, upload-time = "2025-10-08T22:01:46.04Z" }, ] +[[package]] +name = "types-pyyaml" +version = "6.0.12.20250915" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/7e/69/3c51b36d04da19b92f9e815be12753125bd8bc247ba0470a982e6979e71c/types_pyyaml-6.0.12.20250915.tar.gz", hash = "sha256:0f8b54a528c303f0e6f7165687dd33fafa81c807fcac23f632b63aa624ced1d3", size = 17522, upload-time = "2025-09-15T03:01:00.728Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bd/e0/1eed384f02555dde685fff1a1ac805c1c7dcb6dd019c916fe659b1c1f9ec/types_pyyaml-6.0.12.20250915-py3-none-any.whl", hash = "sha256:e7d4d9e064e89a3b3cae120b4990cd370874d2bf12fa5f46c97018dd5d3c9ab6", size = 20338, upload-time = "2025-09-15T03:00:59.218Z" }, +] + [[package]] name = "typing-extensions" version = "4.15.0" diff --git a/taskfiles/tests/integration.yaml b/taskfiles/tests/integration.yaml index 69bd5a3e9c..cd7c8e34e9 100644 --- a/taskfiles/tests/integration.yaml +++ b/taskfiles/tests/integration.yaml @@ -27,3 +27,13 @@ tasks: clp-py-project-imports: dir: "{{.G_INTEGRATION_TESTS_DIR}}" cmd: "uv run pytest tests/test_clp_native_py_project_imports.py" + + package: + deps: + - task: "::package" + dir: "{{.G_INTEGRATION_TESTS_DIR}}" + env: + CLP_BUILD_DIR: "{{.G_BUILD_DIR}}" + CLP_CORE_BINS_DIR: "{{.G_CORE_COMPONENT_BUILD_DIR}}" + CLP_PACKAGE_DIR: "{{.G_PACKAGE_BUILD_DIR}}" + cmd: "uv run pytest -m package"