Skip to content

Commit

Permalink
PSCE-303 feat: adds trestlebot-sync-upstreams command (#142)
Browse files Browse the repository at this point in the history
* feat(entrypoint): adds sync-upstreams entrypoint

E2E tests are updated to include a step with sync_upstreams
to ssp-authoring workflow

Signed-off-by: Jennifer Power <barnabei.jennifer@gmail.com>

* chore: removes additional test data and modifies source test profile

We don't need to store an additional test file with small edits. This
updates the prep_upstream_repo function to just alter the profile and
write it out.

Signed-off-by: Jennifer Power <barnabei.jennifer@gmail.com>

* chore: incorporates changes based on PR feedback

Signed-off-by: Jennifer Power <barnabei.jennifer@gmail.com>

* chore: updates include_models/exclude_models to model_names

Using the term include_models/exclude_models could leave confusion about what
is being filtered in this instance (model vs model instance). Updating
to model_names to be more precise.

Signed-off-by: Jennifer Power <barnabei.jennifer@gmail.com>

---------

Signed-off-by: Jennifer Power <barnabei.jennifer@gmail.com>
  • Loading branch information
jpower432 authored Jan 16, 2024
1 parent f7e32d1 commit c383112
Show file tree
Hide file tree
Showing 6 changed files with 446 additions and 33 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ repository = 'https://github.com/RedHatProductSecurity/trestle-bot'
trestlebot-autosync = "trestlebot.entrypoints.autosync:main"
trestlebot-rules-transform = "trestlebot.entrypoints.rule_transform:main"
trestlebot-create-cd = "trestlebot.entrypoints.create_cd:main"
trestlebot-sync-upstreams = "trestlebot.entrypoints.sync_upstreams:main"

[tool.poetry.dependencies]
python = '^3.8.1'
Expand Down
24 changes: 2 additions & 22 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
TRESTLEBOT_TEST_IMAGE_NAME,
build_test_image,
clean,
repo_setup,
)
from trestlebot import const
from trestlebot.transformers.trestle_rule import (
Expand All @@ -58,28 +59,7 @@ def tmp_repo() -> YieldFixture[Tuple[str, Repo]]:
"""Create a temporary git repository with an initialized trestle workspace root"""
with TemporaryDirectory(prefix=_TEST_PREFIX) as tmpdir:
tmp_path = pathlib.Path(tmpdir)
try:
args = argparse.Namespace(
verbose=0,
trestle_root=tmp_path,
full=True,
local=False,
govdocs=False,
)
init = InitCmd()
init._run(args)
except Exception as e:
raise TrestleError(
f"Initialization failed for temporary trestle directory: {e}."
)
repo = Repo.init(tmpdir)
with repo.config_writer() as config:
config.set_value("user", "email", "test@example.com")
config.set_value("user", "name", "Test User")
repo.git.add(all=True)
repo.index.commit("Initial commit")
# Create a default branch (main)
repo.git.checkout("-b", "main")
repo: Repo = repo_setup(tmp_path)
yield tmpdir, repo

try:
Expand Down
59 changes: 56 additions & 3 deletions tests/e2e/test_e2e_ssp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@
E2E tests for SSP creation and autosync workflow.
Notes that this should be the only E2E for auto-syncing since the UX is the same for each model.
Any model specific test should be under workflows.
The SSP model is used here as a stand-in for all models because it is the most complex process.
The tests here are based on the following workflow:
1. Create new SSP
2. Autosync SSP to create initial Markdown
3. Run autosync again to check that no changes are pushed
4. Update the profile with sync-upstreams
5. Autosync again to check that the changes are pushed
"""

import logging
Expand All @@ -34,7 +41,13 @@
from trestle.core.models.file_content_type import FileContentType
from trestle.oscal.ssp import SystemSecurityPlan

from tests.testutils import build_test_command, setup_for_ssp
from tests.testutils import (
UPSTREAM_REPO,
build_test_command,
clean,
prepare_upstream_repo,
setup_for_ssp,
)
from trestlebot.const import ERROR_EXIT_CODE, INVALID_ARGS_EXIT_CODE, SUCCESS_EXIT_CODE
from trestlebot.tasks.authored.ssp import AuthoredSSP, SSPIndex

Expand Down Expand Up @@ -112,7 +125,6 @@ def test_ssp_editing_e2e(
args = setup_for_ssp(tmp_repo_path, test_prof, [test_comp_name], test_ssp_md)

# Create or generate the SSP

if not skip_create:
index_path = os.path.join(tmp_repo_str, "ssp-index.json")
ssp_index = SSPIndex(index_path)
Expand Down Expand Up @@ -157,3 +169,44 @@ def test_ssp_editing_e2e(
assert ssp_index.get_comps_by_ssp(test_ssp_name) == [test_comp_name]
assert ssp_index.get_leveraged_by_ssp(test_ssp_name) is None
assert ssp_path.exists()

# Check that if run again, the ssp is not pushed again
command = build_test_command(tmp_repo_str, "autosync", command_args, image_name)
run_response = subprocess.run(command, capture_output=True)
assert run_response.returncode == SUCCESS_EXIT_CODE
assert "Nothing to commit" in run_response.stdout.decode("utf-8")

# Check that if the upstream profile is updated, the ssp is updated
local_upstream_path = prepare_upstream_repo()
upstream_repos_arg = f"{UPSTREAM_REPO}@main"
upstream_command_args = {
"branch": command_args["branch"],
"committer-name": command_args["committer-name"],
"committer-email": command_args["committer-email"],
"sources": upstream_repos_arg,
}
command = build_test_command(
tmp_repo_str,
"sync-upstreams",
upstream_command_args,
image_name,
local_upstream_path,
)
run_response = subprocess.run(command, capture_output=True)
assert run_response.returncode == SUCCESS_EXIT_CODE
assert (
f"Changes pushed to {command_args['branch']} successfully."
in run_response.stdout.decode("utf-8")
)

# Autosync again to check that the ssp is updated
command = build_test_command(tmp_repo_str, "autosync", command_args, image_name)
run_response = subprocess.run(command, capture_output=True)
assert run_response.returncode == SUCCESS_EXIT_CODE
assert (
f"Changes pushed to {command_args['branch']} successfully."
in run_response.stdout.decode("utf-8")
)

# Clean up the upstream repo
clean(local_upstream_path, None)
109 changes: 101 additions & 8 deletions tests/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import pathlib
import shutil
import subprocess
import tempfile
from typing import Dict, List, Optional

from git.repo import Repo
from trestle.common.err import TrestleError
from trestle.common.model_utils import ModelUtils
from trestle.core.base_model import OscalBaseModel
from trestle.core.commands.init import InitCmd
from trestle.core.models.file_content_type import FileContentType
from trestle.oscal import catalog as cat
from trestle.oscal import component as comp
Expand All @@ -49,6 +52,9 @@
E2E_BUILD_CONTEXT = "tests/e2e"
CONTAINER_FILE_NAME = "Dockerfile"

# Location the upstream repo is mounted to in the container
UPSTREAM_REPO = "/upstream"


def clean(repo_path: str, repo: Optional[Repo]) -> None:
"""Clean up the temporary Git repository."""
Expand All @@ -57,6 +63,33 @@ def clean(repo_path: str, repo: Optional[Repo]) -> None:
shutil.rmtree(repo_path)


def repo_setup(repo_path: pathlib.Path) -> Repo:
"""Create a temporary Git repository."""
try:
args = argparse.Namespace(
verbose=0,
trestle_root=repo_path,
full=True,
local=False,
govdocs=False,
)
init = InitCmd()
init._run(args)
except Exception as e:
raise TrestleError(
f"Initialization failed for temporary trestle directory: {e}."
)
repo = Repo.init(repo_path)
with repo.config_writer() as config:
config.set_value("user", "email", "test@example.com")
config.set_value("user", "name", "Test User")
repo.git.add(all=True)
repo.index.commit("Initial commit")
# Create a default branch (main)
repo.git.checkout("-b", "main")
return repo


def args_dict_to_list(args_dict: Dict[str, str]) -> List[str]:
"""Transform dictionary of args to a list of args."""
args = []
Expand Down Expand Up @@ -321,20 +354,80 @@ def build_test_command(
command_name: str,
command_args: Dict[str, str],
image_name: str = TRESTLEBOT_TEST_IMAGE_NAME,
upstream_repo: str = "",
) -> List[str]:
"""Build a command to be run in the shell for trestlebot"""
return [
"""
Build a command to be run in the shell for trestlebot
Args:
data_path (str): Path to the data directory. This is the working directory/trestle_root.
command_name (str): Name of the command to run. It should be a trestlebot command.
command_args (Dict[str, str]): Arguments to pass to the command
image_name (str, optional): Name of the image to run. Defaults to TRESTLEBOT_TEST_IMAGE_NAME.
upstream_repo (str, optional): Path to the upstream repo. Defaults to "" and is not mounted.
Returns:
List[str]: Command to be run in the shell
"""
command = [
"podman",
"run",
"--pod",
TRESTLEBOT_TEST_POD_NAME,
"--entrypoint",
f"trestlebot-{command_name}",
"--rm",
"-v",
f"{data_path}:/trestle",
"-w",
"/trestle",
image_name,
*args_dict_to_list(command_args),
]

# Add mounts
if upstream_repo:
# Add a volume and mount it to the container
command.extend(["-v", f"{upstream_repo}:{UPSTREAM_REPO}"])
command.extend(
[
"-v",
f"{data_path}:/trestle",
"-w",
"/trestle",
image_name,
*args_dict_to_list(command_args),
]
)
return command


def prepare_upstream_repo() -> str:
"""
Prepare a temporary upstream repo for testing.
Returns:
str: Path to the upstream repo
Notes:
This includes the test NIST catalog and a modified profile.
It modifies the simplified_nist_profile to simulate upstream
changes for testing.
"""
tmp_dir = pathlib.Path(tempfile.mkdtemp())
repo: Repo = repo_setup(tmp_dir)
load_from_json(
tmp_dir, "simplified_nist_catalog", "simplified_nist_catalog", cat.Catalog
)

# Modify the profile to include an additional control and write it out
src_path = JSON_TEST_DATA_PATH / "simplified_nist_profile.json"
dst_path: pathlib.Path = ModelUtils.get_model_path_for_name_and_class(
tmp_dir, "simplified_nist_profile", prof.Profile, FileContentType.JSON # type: ignore
)
dst_path.parent.mkdir(parents=True, exist_ok=True)

test_profile: prof.Profile = prof.Profile.oscal_read(src_path)

prof_import: prof.Import = test_profile.imports[0]
prof_import.include_controls[0].with_ids.append(prof.WithId(__root__="ac-6"))
test_profile.oscal_write(dst_path)

repo.git.add(all=True)
repo.index.commit("Add updated profile")
repo.close()
return str(tmp_dir)
Loading

0 comments on commit c383112

Please sign in to comment.