diff --git a/pyproject.toml b/pyproject.toml index 6e0133da..26e1d286 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ repository = 'https://github.com/RedHatProductSecurity/trestle-bot' trestlebot-autosync = "trestlebot.entrypoints.autosync:main" trestlebot-rules-transform = "trestlebot.entrypoints.rule_transform:main" trestlebot-create-cd = "trestlebot.entrypoints.create_cd:main" +trestlebot-sync-upstreams = "trestlebot.entrypoints.sync_upstreams:main" [tool.poetry.dependencies] python = '^3.8.1' diff --git a/tests/conftest.py b/tests/conftest.py index c665b2b5..a7b88a6b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -36,6 +36,7 @@ TRESTLEBOT_TEST_IMAGE_NAME, build_test_image, clean, + repo_setup, ) from trestlebot import const from trestlebot.transformers.trestle_rule import ( @@ -58,28 +59,7 @@ def tmp_repo() -> YieldFixture[Tuple[str, Repo]]: """Create a temporary git repository with an initialized trestle workspace root""" with TemporaryDirectory(prefix=_TEST_PREFIX) as tmpdir: tmp_path = pathlib.Path(tmpdir) - try: - args = argparse.Namespace( - verbose=0, - trestle_root=tmp_path, - full=True, - local=False, - govdocs=False, - ) - init = InitCmd() - init._run(args) - except Exception as e: - raise TrestleError( - f"Initialization failed for temporary trestle directory: {e}." - ) - repo = Repo.init(tmpdir) - with repo.config_writer() as config: - config.set_value("user", "email", "test@example.com") - config.set_value("user", "name", "Test User") - repo.git.add(all=True) - repo.index.commit("Initial commit") - # Create a default branch (main) - repo.git.checkout("-b", "main") + repo: Repo = repo_setup(tmp_path) yield tmpdir, repo try: diff --git a/tests/e2e/test_e2e_ssp.py b/tests/e2e/test_e2e_ssp.py index bdad7c7e..658fec34 100644 --- a/tests/e2e/test_e2e_ssp.py +++ b/tests/e2e/test_e2e_ssp.py @@ -18,7 +18,14 @@ E2E tests for SSP creation and autosync workflow. Notes that this should be the only E2E for auto-syncing since the UX is the same for each model. -Any model specific test should be under workflows. +The SSP model is used here as a stand-in for all models because it is the most complex process. + +The tests here are based on the following workflow: +1. Create new SSP +2. Autosync SSP to create initial Markdown +3. Run autosync again to check that no changes are pushed +4. Update the profile with sync-upstreams +5. Autosync again to check that the changes are pushed """ import logging @@ -34,7 +41,13 @@ from trestle.core.models.file_content_type import FileContentType from trestle.oscal.ssp import SystemSecurityPlan -from tests.testutils import build_test_command, setup_for_ssp +from tests.testutils import ( + UPSTREAM_REPO, + build_test_command, + clean, + prepare_upstream_repo, + setup_for_ssp, +) from trestlebot.const import ERROR_EXIT_CODE, INVALID_ARGS_EXIT_CODE, SUCCESS_EXIT_CODE from trestlebot.tasks.authored.ssp import AuthoredSSP, SSPIndex @@ -112,7 +125,6 @@ def test_ssp_editing_e2e( args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], test_ssp_md) # Create or generate the SSP - if not skip_create: index_path = os.path.join(tmp_repo_str, "ssp-index.json") ssp_index = SSPIndex(index_path) @@ -157,3 +169,44 @@ def test_ssp_editing_e2e( assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name] assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None assert ssp_path.exists() + + # Check that if run again, the ssp is not pushed again + command = build_test_command(tmp_repo_str, "autosync", command_args, image_name) + run_response = subprocess.run(command, capture_output=True) + assert run_response.returncode == SUCCESS_EXIT_CODE + assert "Nothing to commit" in run_response.stdout.decode("utf-8") + + # Check that if the upstream profile is updated, the ssp is updated + local_upstream_path = prepare_upstream_repo() + upstream_repos_arg = f"{UPSTREAM_REPO}@main" + upstream_command_args = { + "branch": command_args["branch"], + "committer-name": command_args["committer-name"], + "committer-email": command_args["committer-email"], + "sources": upstream_repos_arg, + } + command = build_test_command( + tmp_repo_str, + "sync-upstreams", + upstream_command_args, + image_name, + local_upstream_path, + ) + run_response = subprocess.run(command, capture_output=True) + assert run_response.returncode == SUCCESS_EXIT_CODE + assert ( + f"Changes pushed to {command_args['branch']} successfully." + in run_response.stdout.decode("utf-8") + ) + + # Autosync again to check that the ssp is updated + command = build_test_command(tmp_repo_str, "autosync", command_args, image_name) + run_response = subprocess.run(command, capture_output=True) + assert run_response.returncode == SUCCESS_EXIT_CODE + assert ( + f"Changes pushed to {command_args['branch']} successfully." + in run_response.stdout.decode("utf-8") + ) + + # Clean up the upstream repo + clean(local_upstream_path, None) diff --git a/tests/testutils.py b/tests/testutils.py index 7c21a116..b49c6c5b 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -21,11 +21,14 @@ import pathlib import shutil import subprocess +import tempfile from typing import Dict, List, Optional from git.repo import Repo +from trestle.common.err import TrestleError from trestle.common.model_utils import ModelUtils from trestle.core.base_model import OscalBaseModel +from trestle.core.commands.init import InitCmd from trestle.core.models.file_content_type import FileContentType from trestle.oscal import catalog as cat from trestle.oscal import component as comp @@ -49,6 +52,9 @@ E2E_BUILD_CONTEXT = "tests/e2e" CONTAINER_FILE_NAME = "Dockerfile" +# Location the upstream repo is mounted to in the container +UPSTREAM_REPO = "/upstream" + def clean(repo_path: str, repo: Optional[Repo]) -> None: """Clean up the temporary Git repository.""" @@ -57,6 +63,33 @@ def clean(repo_path: str, repo: Optional[Repo]) -> None: shutil.rmtree(repo_path) +def repo_setup(repo_path: pathlib.Path) -> Repo: + """Create a temporary Git repository.""" + try: + args = argparse.Namespace( + verbose=0, + trestle_root=repo_path, + full=True, + local=False, + govdocs=False, + ) + init = InitCmd() + init._run(args) + except Exception as e: + raise TrestleError( + f"Initialization failed for temporary trestle directory: {e}." + ) + repo = Repo.init(repo_path) + with repo.config_writer() as config: + config.set_value("user", "email", "test@example.com") + config.set_value("user", "name", "Test User") + repo.git.add(all=True) + repo.index.commit("Initial commit") + # Create a default branch (main) + repo.git.checkout("-b", "main") + return repo + + def args_dict_to_list(args_dict: Dict[str, str]) -> List[str]: """Transform dictionary of args to a list of args.""" args = [] @@ -321,9 +354,22 @@ def build_test_command( command_name: str, command_args: Dict[str, str], image_name: str = TRESTLEBOT_TEST_IMAGE_NAME, + upstream_repo: str = "", ) -> List[str]: - """Build a command to be run in the shell for trestlebot""" - return [ + """ + Build a command to be run in the shell for trestlebot + + Args: + data_path (str): Path to the data directory. This is the working directory/trestle_root. + command_name (str): Name of the command to run. It should be a trestlebot command. + command_args (Dict[str, str]): Arguments to pass to the command + image_name (str, optional): Name of the image to run. Defaults to TRESTLEBOT_TEST_IMAGE_NAME. + upstream_repo (str, optional): Path to the upstream repo. Defaults to "" and is not mounted. + + Returns: + List[str]: Command to be run in the shell + """ + command = [ "podman", "run", "--pod", @@ -331,10 +377,57 @@ def build_test_command( "--entrypoint", f"trestlebot-{command_name}", "--rm", - "-v", - f"{data_path}:/trestle", - "-w", - "/trestle", - image_name, - *args_dict_to_list(command_args), ] + + # Add mounts + if upstream_repo: + # Add a volume and mount it to the container + command.extend(["-v", f"{upstream_repo}:{UPSTREAM_REPO}"]) + command.extend( + [ + "-v", + f"{data_path}:/trestle", + "-w", + "/trestle", + image_name, + *args_dict_to_list(command_args), + ] + ) + return command + + +def prepare_upstream_repo() -> str: + """ + Prepare a temporary upstream repo for testing. + + Returns: + str: Path to the upstream repo + + Notes: + This includes the test NIST catalog and a modified profile. + It modifies the simplified_nist_profile to simulate upstream + changes for testing. + """ + tmp_dir = pathlib.Path(tempfile.mkdtemp()) + repo: Repo = repo_setup(tmp_dir) + load_from_json( + tmp_dir, "simplified_nist_catalog", "simplified_nist_catalog", cat.Catalog + ) + + # Modify the profile to include an additional control and write it out + src_path = JSON_TEST_DATA_PATH / "simplified_nist_profile.json" + dst_path: pathlib.Path = ModelUtils.get_model_path_for_name_and_class( + tmp_dir, "simplified_nist_profile", prof.Profile, FileContentType.JSON # type: ignore + ) + dst_path.parent.mkdir(parents=True, exist_ok=True) + + test_profile: prof.Profile = prof.Profile.oscal_read(src_path) + + prof_import: prof.Import = test_profile.imports[0] + prof_import.include_controls[0].with_ids.append(prof.WithId(__root__="ac-6")) + test_profile.oscal_write(dst_path) + + repo.git.add(all=True) + repo.index.commit("Add updated profile") + repo.close() + return str(tmp_dir) diff --git a/tests/trestlebot/entrypoints/test_sync_upstreams.py b/tests/trestlebot/entrypoints/test_sync_upstreams.py new file mode 100644 index 00000000..e1802de9 --- /dev/null +++ b/tests/trestlebot/entrypoints/test_sync_upstreams.py @@ -0,0 +1,155 @@ +#!/usr/bin/python + +# Copyright 2024 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Test for Sync Upstreams CLI""" + +import logging +from typing import Any, Dict, Tuple +from unittest.mock import patch + +import pytest +from git import Repo + +from tests.testutils import args_dict_to_list, clean, prepare_upstream_repo +from trestlebot.entrypoints.sync_upstreams import main as cli_main + + +@pytest.fixture +def valid_args_dict() -> Dict[str, str]: + return { + "branch": "main", + "sources": "valid_source", + "committer-name": "test", + "committer-email": "test@email.com", + "working-dir": ".", + "file-patterns": ".", + } + + +test_cat = "simplified_nist_catalog" +test_cat_path = "catalogs/simplified_nist_catalog/catalog.json" +test_prof = "simplified_nist_profile" +test_prof_path = "profiles/simplified_nist_profile/profile.json" +test_repo_url = "git.test.com/test/repo.git" + + +def test_sync_upstreams( + tmp_repo: Tuple[str, Repo], valid_args_dict: Dict[str, str] +) -> None: + """Test sync upstreams with default settings and valid args.""" + repo_path, repo = tmp_repo + repo.create_remote("origin", url=test_repo_url) + + args_dict = valid_args_dict + args_dict["working-dir"] = repo_path + + source: str = prepare_upstream_repo() + + args_dict["sources"] = f"{source}@main" + + with patch("git.remote.Remote.push") as mock_push, patch( + "sys.argv", ["trestlebot", *args_dict_to_list(args_dict)] + ): + mock_push.return_value = "Mocked Results" + with pytest.raises(SystemExit, match="0"): + cli_main() + + # Verify that the correct files were included + commit = next(repo.iter_commits()) + assert test_cat_path in commit.stats.files + assert test_prof_path in commit.stats.files + assert len(commit.stats.files) == 2 + + # Clean up the source repo + clean(source, None) + + +def test_with_include_model_names( + tmp_repo: Tuple[str, Repo], valid_args_dict: Dict[str, str] +) -> None: + """Test sync upstreams with include model names flag.""" + repo_path, repo = tmp_repo + repo.create_remote("origin", url=test_repo_url) + + args_dict = valid_args_dict + args_dict["include-model-names"] = test_cat + args_dict["working-dir"] = repo_path + + source: str = prepare_upstream_repo() + + args_dict["sources"] = f"{source}@main" + + with patch("git.remote.Remote.push") as mock_push, patch( + "sys.argv", ["trestlebot", *args_dict_to_list(args_dict)] + ): + mock_push.return_value = "Mocked Results" + with pytest.raises(SystemExit, match="0"): + cli_main() + + # Verify that the correct files were included + commit = next(repo.iter_commits()) + assert test_cat_path in commit.stats.files + assert len(commit.stats.files) == 1 + + # Clean up the source repo + clean(source, None) + + +def test_with_exclude_model_names( + tmp_repo: Tuple[str, Repo], valid_args_dict: Dict[str, str] +) -> None: + """Test sync upstreams with exclude model names flag.""" + repo_path, repo = tmp_repo + repo.create_remote("origin", url=test_repo_url) + + args_dict = valid_args_dict + args_dict["exclude-model-names"] = test_prof + args_dict["working-dir"] = repo_path + + source: str = prepare_upstream_repo() + args_dict["sources"] = f"{source}@main" + + with patch("git.remote.Remote.push") as mock_push, patch( + "sys.argv", ["trestlebot", *args_dict_to_list(args_dict)] + ): + mock_push.return_value = "Mocked Results" + with pytest.raises(SystemExit, match="0"): + cli_main() + + # Verify that the profile was excluded + commit = next(repo.iter_commits()) + assert test_cat_path in commit.stats.files + assert len(commit.stats.files) == 1 + + # Clean up the source repo + clean(source, None) + + +def test_with_no_sources(valid_args_dict: Dict[str, str], caplog: Any) -> None: + """Test with an invalid source argument.""" + args_dict = valid_args_dict + args_dict["sources"] = "" + + with patch("sys.argv", ["trestlebot", *args_dict_to_list(args_dict)]): + with pytest.raises(SystemExit, match="2"): + cli_main() + + assert any( + record.levelno == logging.ERROR + and "Invalid args --sources: Must set at least one source to sync from." + in record.message + for record in caplog.records + ) diff --git a/trestlebot/entrypoints/sync_upstreams.py b/trestlebot/entrypoints/sync_upstreams.py new file mode 100644 index 00000000..c5d0394c --- /dev/null +++ b/trestlebot/entrypoints/sync_upstreams.py @@ -0,0 +1,131 @@ +#!/usr/bin/python + +# Copyright 2024 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Entrypoint for synchronizing content from upstreams git sources. + +Note: This currently does not following imports of the synced OSCAL content. +""" + +import argparse +import logging +import sys +from typing import List + +from trestlebot.const import SUCCESS_EXIT_CODE +from trestlebot.entrypoints.entrypoint_base import ( + EntrypointBase, + EntrypointInvalidArgException, + comma_sep_to_list, + handle_exception, +) +from trestlebot.entrypoints.log import set_log_level_from_args +from trestlebot.tasks.base_task import ModelFilter, TaskBase +from trestlebot.tasks.sync_upstreams_task import SyncUpstreamsTask + + +logger = logging.getLogger(__name__) + + +class SyncUpstreamsEntrypoint(EntrypointBase): + """Entrypoint for the sync upstreams operation.""" + + def __init__(self, parser: argparse.ArgumentParser) -> None: + """Initialize.""" + # Setup base arguments + super().__init__(parser) + self.setup_sync_upstreams_arguments() + + def setup_sync_upstreams_arguments(self) -> None: + """Setup arguments for the sync upstreams entrypoint.""" + self.parser.add_argument( + "--sources", + type=str, + required=True, + help="Comma-separated list of upstream git sources to fetch from. Each source is a string \ + of the form @ where ref is a git ref such as a tag or branch.", + ) + self.parser.add_argument( + "--include-model-names", + type=str, + required=False, + help="Comma-separated list of glob patterns for model names to include when running \ + tasks (e.g. --include-models=component_x,profile_y*)", + ) + self.parser.add_argument( + "--exclude-model-names", + type=str, + required=False, + help="Comma-separated list of glob patterns for model names to exclude when running \ + tasks (e.g. --exclude-models=component_x,profile_y*)", + ) + self.parser.add_argument( + "--skip-validation", + action="store_true", + help="Skip validation of the models when they are copied", + ) + + def run(self, args: argparse.Namespace) -> None: + """Run the sync upstreams entrypoint.""" + exit_code: int = SUCCESS_EXIT_CODE + try: + set_log_level_from_args(args) + if not args.sources: + raise EntrypointInvalidArgException( + "--sources", "Must set at least one source to sync from." + ) + + # Assume that if exclude_model_names is not set, then + # skip nothing and if include_model_names is not set, then include all. + include_model_list: List[str] = ["*"] + if args.include_model_names: + include_model_list = comma_sep_to_list(args.include_model_names) + model_filter: ModelFilter = ModelFilter( + skip_patterns=comma_sep_to_list(args.exclude_model_names), + include_patterns=include_model_list, + ) + + validate: bool = not args.skip_validation + + sync_upstreams_task: TaskBase = SyncUpstreamsTask( + working_dir=args.working_dir, + git_sources=comma_sep_to_list(args.sources), + model_filter=model_filter, + validate=validate, + ) + pre_tasks: List[TaskBase] = [sync_upstreams_task] + + super().run_base(args, pre_tasks) + except Exception as e: + exit_code = handle_exception(e) + + sys.exit(exit_code) + + +def main() -> None: + """Run the CLI.""" + parser = argparse.ArgumentParser( + description="Sync content from upstreams git sources." + ) + sync_upstreams = SyncUpstreamsEntrypoint(parser=parser) + + args = parser.parse_args() + + sync_upstreams.run(args) + + +if __name__ == "__main__": + main()