From 5353ebc86f285795ef078648279fd2c6820269ac Mon Sep 17 00:00:00 2001 From: eric-intuitem <71850047+eric-intuitem@users.noreply.github.com> Date: Sat, 18 Jan 2025 11:09:04 +0100 Subject: [PATCH] export/import domain capabilities (#1376) * Write serializers for exported models * Add slug related fields to non-referential sluggable keys * Add import/export serializer classes for Threat and ReferenceControl models * Write unit tests for Vulnerability model * Serialize models with their import/export serializer at dump time * Add docstrings * Remove filtering labels from export serializers * Fix import order * feat: add import endpoint and serializer validation * feat: add topological sort for creation order * Get objects to include in domain export * Expose rudimentary domain export endpoint * Serialize referential foreign keys using URN * Serialize risk matrices and frameworks for export * feat: order objects import with parent fields * Hash primary keys on dump * Serialize loaded libraries for export * feat: move import to folders endpoint * fix: self reference in dependency graph * feat: start objecs creation * feat: refactor object validation/creation * feat: handle complete scenario creation * Write serializers for exported models * Add slug related fields to non-referential sluggable keys * Add import/export serializer classes for Threat and ReferenceControl models * Write unit tests for Vulnerability model * Serialize dump * Serialize models with their import/export serializer at dump time * Add docstrings * Remove filtering labels from export serializers * Remove outdated comment * Fix import order * Get objects to include in domain export * Expose rudimentary domain export endpoint * Serialize referential foreign keys using URN * Serialize risk matrices and frameworks for export * Hash primary keys on dump * Serialize loaded libraries for export * Update vulnerability import export unit tests * Serialize evidence attachment size for export * remove folders from domain export * Serialize evidence attachment hash for export * Compress domain dump file * chore: Remove unused imports * feat: add ebios rm objects import * Use HttpResponse for domain export dump file * feat: remove attachment hash and size before evidence creation * feat: process gzip * feat: prepare frontend folder import * Port domain export to the frontend * Remove redundant serializer field * Pass enctype to folder import form as props * Validate that folder import file is a file * Manage domain dump upload * chore: format codebase * feat: improve methods complexity * chore: format * chore: format with good version * fix: change import button data-testid * style: improve colors and labels * feat: add slugified domain name in export file name * chore: format file input component in FolderForm * locale: add fr translations * chore: use match cases instead of elifs * feat: handle name check error on frontend * feat: validate backup version on import * feat: adding transaction to avoid object creation if failure * Export attachments with evidence * Write zipfile in memory and add some logging * feat: improve error handling * chore: ruff format * Attachment upload PoC * Fix domain import in enterprise frontend * WIP * fix attachment management * ruff * fix broken html/zip export of audit * ruff --------- Co-authored-by: Nassim Tabchiche Co-authored-by: Mohamed-Hacene --- backend/core/models.py | 7 + backend/core/serializer_fields.py | 16 + backend/core/serializers.py | 368 +++++++- backend/core/templates/snippets/req_node.html | 207 ++--- backend/core/tests/fixtures.py | 32 +- backend/core/tests/test_vulnerability.py | 117 +++ backend/core/views.py | 814 +++++++++++++++++- backend/ebios_rm/serializers.py | 176 +++- backend/library/serializers.py | 32 +- backend/serdes/serializers.py | 100 ++- backend/serdes/utils.py | 518 ++++++++++- backend/tprm/serializers.py | 20 +- .../Forms/ModelForm/FolderForm.svelte | 34 +- .../folders/[id=uuid]/+page.server.ts | 7 + .../(internal)/folders/[id=uuid]/+page.svelte | 25 + .../folders/[id=uuid]/export/+server.ts | 22 + frontend/messages/en.json | 8 +- frontend/messages/fr.json | 8 +- .../components/DetailView/DetailView.svelte | 1 + .../src/lib/components/Forms/ModelForm.svelte | 3 +- .../Forms/ModelForm/FolderForm.svelte | 32 + .../lib/components/Modals/CreateModal.svelte | 3 + frontend/src/lib/utils/actions.ts | 4 +- frontend/src/lib/utils/schemas.ts | 6 + .../[model=urlmodel]/+page.server.ts | 65 +- .../(internal)/[model=urlmodel]/+page.svelte | 41 + 26 files changed, 2474 insertions(+), 192 deletions(-) create mode 100644 backend/core/tests/test_vulnerability.py create mode 100644 enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/+page.server.ts create mode 100644 enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/+page.svelte create mode 100644 enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/export/+server.ts diff --git a/backend/core/models.py b/backend/core/models.py index 0a403d988..71308c4d3 100644 --- a/backend/core/models.py +++ b/backend/core/models.py @@ -1,6 +1,7 @@ import json import os import re +import hashlib from datetime import date, datetime from pathlib import Path from typing import Self, Type, Union @@ -1729,6 +1730,12 @@ def get_size(self): else: return f"{size / 1024 / 1024:.1f} MB" + @property + def attachment_hash(self): + if not self.attachment: + return None + return hashlib.sha256(self.attachment.read()).hexdigest() + class AppliedControl(NameDescriptionMixin, FolderMixin, PublishInRootFolderMixin): class Status(models.TextChoices): diff --git a/backend/core/serializer_fields.py b/backend/core/serializer_fields.py index 6884e87e4..1657efbae 100644 --- a/backend/core/serializer_fields.py +++ b/backend/core/serializer_fields.py @@ -1,10 +1,26 @@ +from hashlib import sha256 from typing import Any + from django.db import models from rest_framework import serializers from iam.models import Folder +class HashSlugRelatedField(serializers.SlugRelatedField): + """ + A custom SlugRelatedField that hashes the slug value during serialization. + """ + + def to_representation(self, obj): + # Get the original slug value + value = super().to_representation(obj) + if value is None: + return None + # Hash the value + return sha256(str(value).encode()).hexdigest()[:12] + + class FieldsRelatedField(serializers.RelatedField): """ Serializer relational field that represents the target of the relationship by a diff --git a/backend/core/serializers.py b/backend/core/serializers.py index a68e6434d..e67c92c2a 100644 --- a/backend/core/serializers.py +++ b/backend/core/serializers.py @@ -1,19 +1,18 @@ import importlib from typing import Any -from ciso_assistant.settings import EMAIL_HOST, EMAIL_HOST_RESCUE +import structlog +from django.contrib.auth import get_user_model +from django.db import models +from ciso_assistant.settings import EMAIL_HOST, EMAIL_HOST_RESCUE from core.models import * -from iam.models import * +from core.serializer_fields import FieldsRelatedField, HashSlugRelatedField from ebios_rm.models import EbiosRMStudy +from iam.models import * from rest_framework import serializers from rest_framework.exceptions import PermissionDenied -from django.contrib.auth import get_user_model -from django.db import models -from core.serializer_fields import FieldsRelatedField - -import structlog logger = structlog.get_logger(__name__) @@ -131,6 +130,29 @@ class RiskMatrixWriteSerializer(RiskMatrixReadSerializer): pass +class RiskMatrixImportExportSerializer(BaseModelSerializer): + library = serializers.SlugRelatedField(slug_field="urn", read_only=True) + + class Meta: + model = RiskMatrix + fields = [ + "created_at", + "updated_at", + "urn", + "name", + "description", + "ref_id", + "annotation", + "translations", + "locale", + "default_locale", + "library", + "is_enabled", + "provider", + "json_definition", + ] + + class VulnerabilityReadSerializer(BaseModelSerializer): folder = FieldsRelatedField() applied_controls = FieldsRelatedField(many=True) @@ -147,20 +169,26 @@ class Meta: exclude = ["created_at", "updated_at", "is_published"] -class RiskAcceptanceWriteSerializer(BaseModelSerializer): - # NOTE: This is a workaround to filter the approvers on api view - # but it causes some problems in api_tests. Serializers are - # called before to create users, so the approvers_id list - # is empty and the api_tests fail. - # approvers_id = [] - # try: - # for candidate in User.objects.all(): - # if RoleAssignment.has_permission(candidate, 'approve_riskacceptance'): - # approvers_id.append(candidate.id) - # except: - # pass - # approver = serializers.PrimaryKeyRelatedField(queryset=User.objects.filter(id__in=approvers_id)) +class VulnerabilityImportExportSerializer(BaseModelSerializer): + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + applied_controls = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + class Meta: + model = Vulnerability + fields = [ + "ref_id", + "name", + "description", + "folder", + "status", + "severity", + "applied_controls", + "created_at", + "updated_at", + ] + + +class RiskAcceptanceWriteSerializer(BaseModelSerializer): class Meta: model = RiskAcceptance exclude = ["accepted_at", "rejected_at", "revoked_at", "state"] @@ -192,6 +220,22 @@ class Meta: fields = "__all__" +class ProjectImportExportSerializer(BaseModelSerializer): + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + + class Meta: + model = Project + fields = [ + "ref_id", + "name", + "description", + "folder", + "lc_status", + "created_at", + "updated_at", + ] + + class RiskAssessmentWriteSerializer(BaseModelSerializer): class Meta: model = RiskAssessment @@ -218,6 +262,33 @@ class Meta: exclude = [] +class RiskAssessmentImportExportSerializer(BaseModelSerializer): + risk_matrix = serializers.SlugRelatedField(slug_field="urn", read_only=True) + + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + project = HashSlugRelatedField(slug_field="pk", read_only=True) + ebios_rm_study = HashSlugRelatedField(slug_field="pk", read_only=True) + + class Meta: + model = RiskAssessment + fields = [ + "ref_id", + "name", + "version", + "description", + "folder", + "project", + "eta", + "due_date", + "status", + "observation", + "risk_matrix", + "ebios_rm_study", + "created_at", + "updated_at", + ] + + class AssetWriteSerializer(BaseModelSerializer): ebios_rm_studies = serializers.PrimaryKeyRelatedField( many=True, @@ -260,6 +331,27 @@ class AssetReadSerializer(AssetWriteSerializer): type = serializers.CharField(source="get_type_display") +class AssetImportExportSerializer(BaseModelSerializer): + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + parent_assets = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + + class Meta: + model = Asset + fields = [ + "type", + "name", + "description", + "business_value", + "reference_link", + "security_objectives", + "disaster_recovery_objectives", + "parent_assets", + "folder", + "created_at", + "updated_at", + ] + + class ReferenceControlWriteSerializer(BaseModelSerializer): class Meta: model = ReferenceControl @@ -275,6 +367,33 @@ class Meta: exclude = ["translations"] +class ReferenceControlImportExportSerializer(BaseModelSerializer): + library = serializers.SlugRelatedField(slug_field="urn", read_only=True) + + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + + class Meta: + model = ReferenceControl + fields = [ + "ref_id", + "name", + "description", + "urn", + "provider", + "category", + "csf_function", + "typical_evidence", + "annotation", + "translations", + "locale", + "default_locale", + "folder", + "library", + "created_at", + "updated_at", + ] + + """class LibraryReadSerializer(BaseModelSerializer): class Meta: model = LoadedLibrary @@ -293,8 +412,6 @@ class Meta: model = Threat exclude = ["translations"] - # ["id", "folder", "ref_id", "name", "description", "provider"] # TODO: check why not all? - class ThreatReadSerializer(ReferentialSerializer): folder = FieldsRelatedField() @@ -305,6 +422,30 @@ class Meta: exclude = ["translations"] +class ThreatImportExportSerializer(BaseModelSerializer): + library = serializers.SlugRelatedField(slug_field="urn", read_only=True) + + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + + class Meta: + model = Threat + fields = [ + "created_at", + "updated_at", + "folder", + "urn", + "ref_id", + "provider", + "name", + "description", + "annotation", + "translations", + "locale", + "default_locale", + "library", + ] + + class RiskScenarioWriteSerializer(BaseModelSerializer): risk_matrix = serializers.PrimaryKeyRelatedField( read_only=True, source="risk_assessment.risk_matrix" @@ -342,6 +483,42 @@ class RiskScenarioReadSerializer(RiskScenarioWriteSerializer): owner = FieldsRelatedField(many=True) +class RiskScenarioImportExportSerializer(BaseModelSerializer): + threats = HashSlugRelatedField(slug_field="pk", many=True, read_only=True) + risk_assessment = HashSlugRelatedField(slug_field="pk", read_only=True) + vulnerabilities = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + assets = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + existing_applied_controls = HashSlugRelatedField( + slug_field="pk", read_only=True, many=True + ) + applied_controls = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + + class Meta: + model = RiskScenario + fields = [ + "ref_id", + "name", + "description", + "risk_assessment", + "treatment", + "threats", + "vulnerabilities", + "assets", + "existing_controls", + "existing_applied_controls", + "applied_controls", + "current_proba", + "current_impact", + "residual_proba", + "residual_impact", + "strength_of_knowledge", + "justification", + "created_at", + "updated_at", + "qualifications", + ] + + class AppliedControlWriteSerializer(BaseModelSerializer): class Meta: model = AppliedControl @@ -375,6 +552,35 @@ class Meta: fields = ["name", "description", "folder"] +class AppliedControlImportExportSerializer(BaseModelSerializer): + reference_control = HashSlugRelatedField(slug_field="pk", read_only=True) + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + evidences = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + + class Meta: + model = AppliedControl + fields = [ + "folder", + "ref_id", + "name", + "description", + "priority", + "reference_control", + "created_at", + "updated_at", + "category", + "csf_function", + "status", + "start_date", + "eta", + "expiry_date", + "link", + "effort", + "cost", + "evidences", + ] + + class PolicyWriteSerializer(AppliedControlWriteSerializer): class Meta: model = Policy @@ -531,6 +737,21 @@ class Meta: fields = "__all__" +class FolderImportExportSerializer(BaseModelSerializer): + parent_folder = HashSlugRelatedField(slug_field="pk", read_only=True) + + class Meta: + model = Folder + fields = [ + "parent_folder", + "name", + "description", + "content_type", + "created_at", + "updated_at", + ] + + # Compliance Assessment @@ -548,6 +769,29 @@ class FrameworkWriteSerializer(FrameworkReadSerializer): pass +class FrameworkImportExportSerializer(BaseModelSerializer): + library = serializers.SlugRelatedField(slug_field="urn", read_only=True) + + class Meta: + model = Framework + fields = [ + "urn", + "ref_id", + "name", + "library", + "min_score", + "max_score", + "implementation_groups_definition", + "provider", + "annotation", + "translations", + "locale", + "default_locale", + "created_at", + "updated_at", + ] + + class RequirementNodeReadSerializer(ReferentialSerializer): reference_controls = FieldsRelatedField(many=True) threats = FieldsRelatedField(many=True) @@ -588,6 +832,26 @@ class Meta: fields = "__all__" +class EvidenceImportExportSerializer(BaseModelSerializer): + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + attachment = serializers.CharField(allow_blank=True) + size = serializers.CharField(source="get_size", read_only=True) + attachment_hash = serializers.CharField(read_only=True) + + class Meta: + model = Evidence + fields = [ + "folder", + "name", + "description", + "attachment", + "created_at", + "updated_at", + "size", + "attachment_hash", + ] + + class AttachmentUploadSerializer(serializers.Serializer): attachment = serializers.FileField(required=True) @@ -645,6 +909,35 @@ class Meta: fields = "__all__" +class ComplianceAssessmentImportExportSerializer(BaseModelSerializer): + framework = serializers.SlugRelatedField(slug_field="urn", read_only=True) + + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + project = HashSlugRelatedField(slug_field="pk", read_only=True) + + class Meta: + model = ComplianceAssessment + fields = [ + "ref_id", + "name", + "version", + "description", + "folder", + "project", + "eta", + "due_date", + "status", + "observation", + "framework", + "selected_implementation_groups", + "min_score", + "max_score", + "scores_definition", + "created_at", + "updated_at", + ] + + class RequirementAssessmentReadSerializer(BaseModelSerializer): class FilteredNodeSerializer(RequirementNodeReadSerializer): class Meta: @@ -723,6 +1016,37 @@ class Meta: fields = "__all__" +class RequirementAssessmentImportExportSerializer(BaseModelSerializer): + requirement = serializers.SlugRelatedField(slug_field="urn", read_only=True) + + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + compliance_assessment = HashSlugRelatedField(slug_field="pk", read_only=True) + evidences = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + applied_controls = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + + class Meta: + model = RequirementAssessment + fields = [ + "created_at", + "updated_at", + "eta", + "due_date", + "folder", + "status", + "result", + "score", + "is_scored", + "observation", + "compliance_assessment", + "requirement", + "selected", + "mapping_inference", + "answer", + "evidences", + "applied_controls", + ] + + class RequirementMappingSetWriteSerializer(RequirementMappingSetReadSerializer): pass diff --git a/backend/core/templates/snippets/req_node.html b/backend/core/templates/snippets/req_node.html index 5953d0304..16e4f3ff2 100644 --- a/backend/core/templates/snippets/req_node.html +++ b/backend/core/templates/snippets/req_node.html @@ -2,110 +2,125 @@
{% if not node.requirement_node.assessable %} -
{{ node.requirement_node.display_long }}
- {% if node.bar_graph %} {% bar_graph assessments ancestors - node.requirement_node %} {% endif %} {% else %} -
-
-
{{ node.assessments.requirement }}
-
-
- {{ node.status }} -
-
- {{ node.result }} -
- {% if node.assessments.is_scored %} -
- {{ node.assessments.score }} +
{{ node.requirement_node.display_long }}
+ {% if node.bar_graph %} + {% bar_graph assessments ancestors node.requirement_node %} + {% endif %} + {% else %} +
+
+
{{ node.assessments.requirement }}
+
+
+ {{ node.status }} +
+
+ {{ node.result }} +
+ {% if node.assessments.is_scored %} +
+ {{ node.assessments.score }} +
+ {% endif %}
- {% endif %}
-
- {% if node.assessments.requirement.description %} -
- {{ node.assessments.requirement.get_description_translated }} -
- {% endif %} {% if node.assessments.answer %} {% for question in - node.assessments.answer.questions %} -
-

{{ question.text }}

- {% if question.answer %} - {{ question.answer }} - {% else %} - {% trans "No answer" %} + {% if node.assessments.requirement.description %} +
+ {{ node.assessments.requirement.get_description_translated }} +
+ {% endif %} + {% if node.assessments.answer %} + {% for question in node.assessments.answer.questions %} +
+

{{ question.text }}

+ {% if question.answer %} + {{ question.answer }} + {% else %} + {% trans "No answer" %} + {% endif %} +
+ {% endfor %} + {% endif %} + {% if node.assessments.observation %} +
+

{% trans "Observation:" %}

+

{{ node.assessments.observation }}

+
{% endif %}
- {% endfor %} {% endif %} {% if node.assessments.observation %} -
-

{% trans "Observation:" %}

-

{{ node.assessments.observation }}

-
- {% endif %} -
- {% if node.bar_graph %} {% bar_graph assessments ancestors - node.requirement_node %} {% endif %} {% if node.direct_evidences %} -
-
-
{% trans "Associated evidence:" %}
-
- {% for evidence in node.direct_evidences %} {% if evidence.attachment %} -
  • - {{ evidence.name }} -
  • - {% else %} -
  • {{ evidence.name }}
  • - {% endif %} {% endfor %} -
    - {% endif %} {% if node.applied_controls %} -
    -
    -
    {% trans "Applied controls:" %}
    -
    {% trans "Evidence of applied controls:" %}
    -
    -
    -
    - {% for control in node.applied_controls %} -
  • - {{ control.measure.name }}: {{ control.measure.get_result_display }} -
  • +
    +
    {% trans "Associated evidence:" %}
    +
    + {% for evidence in node.direct_evidences %} + {% if evidence.attachment %} +
  • + {{ evidence.name }} +
  • + {% else %} +
  • {{ evidence.name }}
  • + {% endif %} {% endfor %}
    -
    - {% for control in node.applied_controls %} {% for evidence in - control.evidences %} {% if evidence.attachment %} -
  • - {{ control.measure.name }}/{{ evidence.name }} -
  • - {% else %} -
  • {{ evidence.name }}
  • - {% endif %} {% endfor %} {% endfor %} + {% endif %} + {% if node.applied_controls %} +
    +
    +
    {% trans "Applied controls:" %}
    +
    {% trans "Evidence of applied controls:" %}
    +
    +
    +
    + {% for control in node.applied_controls %} +
  • + {{ control.measure.name }}: {{ control.measure.get_result_display }} +
  • + {% endfor %} +
    +
    + {% for control in node.applied_controls %} + {% for evidence in control.evidences %} + {% if evidence.attachment %} +
  • + {{ control.measure.name }}/{{ evidence.name }} +
  • + {% else %} +
  • {{ evidence.name }}
  • + {% endif %} + {% endfor %} + {% endfor %} +
    +
    -
    -
    - {% endif %} {% endif %} + {% endif %} + {% endif %}
    - {% for child_node in node.children %} {% include "snippets/req_node.html" - with node=child_node %} {% endfor %} + {% for child_node in node.children %} + {% include "snippets/req_node.html" with node=child_node %} + {% endfor %}
    diff --git a/backend/core/tests/fixtures.py b/backend/core/tests/fixtures.py index d21220d3f..03391e594 100644 --- a/backend/core/tests/fixtures.py +++ b/backend/core/tests/fixtures.py @@ -3,14 +3,19 @@ from core.models import ( Project, StoredLibrary, + FilteringLabel, + AppliedControl, ) from iam.models import Folder @pytest.fixture def domain_project_fixture(): + root_folder = Folder.objects.get(content_type=Folder.ContentType.ROOT) folder = Folder.objects.create( - name="test folder", description="test folder description" + parent_folder=root_folder, + name="test folder", + description="test folder description", ) project = Project.objects.create(name="test project", folder=folder) return project @@ -37,3 +42,28 @@ def iso27001_csf1_1_frameworks_fixture(): ) assert csf_1_1_library is not None csf_1_1_library.load() + + +@pytest.fixture +def filtering_labels(): + labels = [ + FilteringLabel.objects.create(label="critical"), + FilteringLabel.objects.create(label="internal"), + FilteringLabel.objects.create(label="external"), + ] + return labels + + +@pytest.fixture +def applied_controls(): + return [ + AppliedControl.objects.create( + name="Bastion", + ), + AppliedControl.objects.create( + name="Firewall", + ), + AppliedControl.objects.create( + name="IDS", + ), + ] diff --git a/backend/core/tests/test_vulnerability.py b/backend/core/tests/test_vulnerability.py new file mode 100644 index 000000000..78ede2681 --- /dev/null +++ b/backend/core/tests/test_vulnerability.py @@ -0,0 +1,117 @@ +import pytest +import hashlib +from django.core.exceptions import ValidationError +from django.contrib.auth import get_user_model +from iam.models import Folder + +from core.models import ( + Vulnerability, +) + +from core.serializers import VulnerabilityImportExportSerializer + +from .fixtures import * + +User = get_user_model() + + +@pytest.fixture +def valid_vulnerability_data(domain_project_fixture, applied_controls): + domain = Folder.objects.filter(content_type=Folder.ContentType.DOMAIN).first() + applied_controls = AppliedControl.objects.all() + return { + "ref_id": "VULN-2025-001", + "name": "SQL Injection in Login Form", + "description": "A SQL injection vulnerability was found in the login form", + "folder": str(domain.id), + "status": Vulnerability.Status.EXPLOITABLE, + "severity": 2, + "applied_controls": [control.id for control in applied_controls[:2]], + "created_at": "2025-01-13T10:00:00Z", + "updated_at": "2025-01-13T10:00:00Z", + } + + +@pytest.mark.django_db +class TestVulnerability: + @pytest.mark.usefixtures("domain_project_fixture", "applied_controls") + def test_basic_vulnerability_creation(self): + vulnerability = Vulnerability.objects.create( + name="SQL Injection in Login Form", + ) + + assert vulnerability is not None + assert vulnerability.name == "SQL Injection in Login Form" + assert vulnerability.ref_id == "" + assert vulnerability.description is None + assert vulnerability.folder == Folder.get_root_folder() + assert vulnerability.status == Vulnerability.Status.UNDEFINED + assert vulnerability.severity == -1 + assert vulnerability.applied_controls.count() == 0 + assert vulnerability.created_at is not None + assert vulnerability.updated_at is not None + + @pytest.mark.usefixtures("domain_project_fixture") + def test_vulnerability_creation_same_name_in_scope(self): + domain = Folder.objects.filter(content_type=Folder.ContentType.DOMAIN).first() + vulnerability = Vulnerability.objects.create( + name="SQL Injection in Login Form", folder=domain + ) + assert vulnerability is not None + assert vulnerability.folder == domain + + with pytest.raises(ValidationError): + Vulnerability.objects.create( + name="SQL Injection in Login Form", folder=domain + ) + + def test_vulnerability_import_export_valid_data(self, valid_vulnerability_data): + """Test serializer with valid data""" + serializer = VulnerabilityImportExportSerializer(data=valid_vulnerability_data) + assert serializer.is_valid(), f"Validation errors: {serializer.errors}" + assert serializer.errors == {} + + def test_missing_required_fields(self): + """Test with missing required fields""" + incomplete_data = {} + serializer = VulnerabilityImportExportSerializer(data=incomplete_data) + assert not serializer.is_valid() + for field in [ + "name", + ]: + assert field in serializer.errors + + def test_serialization(self, domain_project_fixture, applied_controls): + """Test serializing an existing vulnerability instance""" + folder = Folder.objects.filter(content_type=Folder.ContentType.DOMAIN).first() + vulnerability = Vulnerability.objects.create( + ref_id="VULN-2025-002", + name="XSS in Comments", + description="Cross-site scripting vulnerability in comments section", + folder=folder, + status="open", + severity=3, + created_at="2025-01-13T10:00:00Z", + updated_at="2025-01-13T10:00:00Z", + ) + vulnerability.applied_controls.set(applied_controls[:2]) + + serializer = VulnerabilityImportExportSerializer(vulnerability) + data = serializer.data + + assert data["ref_id"] == "VULN-2025-002" + assert data["name"] == "XSS in Comments" + assert data["severity"] == 3 + assert ( + data["folder"] + == hashlib.sha256(str(getattr(folder, "pk")).encode()).hexdigest()[:12] + ) + assert len(data["applied_controls"]) == 2 + assert isinstance(data["severity"], int) + + def test_empty_relationships(self, valid_vulnerability_data): + """Test with empty relationships""" + valid_vulnerability_data.update({"applied_controls": []}) + serializer = VulnerabilityImportExportSerializer(data=valid_vulnerability_data) + assert serializer.is_valid(), f"Validation errors: {serializer.errors}" + assert serializer.errors == {} diff --git a/backend/core/views.py b/backend/core/views.py index 4636b292c..665105c2b 100644 --- a/backend/core/views.py +++ b/backend/core/views.py @@ -1,24 +1,25 @@ import csv +import gzip +import json import mimetypes import re +import os import tempfile import uuid import zipfile from datetime import date, datetime, timedelta +from typing import Dict, Any, List, Tuple import time -from django.views.generic import detail import pytz -from typing import Any, Tuple from uuid import UUID from itertools import cycle import django_filters as df -from ciso_assistant.settings import BUILD, VERSION, EMAIL_HOST, EMAIL_HOST_RESCUE +from ciso_assistant.settings import EMAIL_HOST, EMAIL_HOST_RESCUE, VERSION import shutil from pathlib import Path import humanize -from django.http import StreamingHttpResponse from wsgiref.util import FileWrapper import io @@ -28,6 +29,8 @@ from docxtpl import DocxTemplate from .generators import gen_audit_context +from django.utils import timezone +from django.utils.text import slugify from django.utils.decorators import method_decorator from django.views.decorators.cache import cache_page from django.views.decorators.vary import vary_on_cookie @@ -35,13 +38,14 @@ from django.db.models import F, Q +from django.apps import apps from django.contrib.auth.models import Permission from django.contrib.auth import get_user_model from django.conf import settings from django.core.files.storage import default_storage -from django.db import models +from django.db import models, transaction from django.forms import ValidationError -from django.http import FileResponse, HttpResponse +from django.http import FileResponse, HttpResponse, StreamingHttpResponse from django.middleware import csrf from django.template.loader import render_to_string from django.utils.functional import Promise @@ -55,13 +59,17 @@ permission_classes, renderer_classes, ) -from rest_framework.parsers import FileUploadParser +from rest_framework.parsers import ( + FileUploadParser, + MultiPartParser, + JSONParser, + FormParser, +) from rest_framework.renderers import JSONRenderer from rest_framework.request import Request from rest_framework.response import Response from rest_framework.utils.serializer_helpers import ReturnDict from rest_framework.views import APIView -from rest_framework.permissions import AllowAny from weasyprint import HTML @@ -78,12 +86,28 @@ from ebios_rm.models import ( EbiosRMStudy, - OperationalScenario, + FearedEvent, + RoTo, + StrategicScenario, + Stakeholder, + AttackPath, ) +from tprm.models import Entity + from .models import * from .serializers import * +from serdes.utils import ( + get_domain_export_objects, + import_export_serializer_class, + topological_sort, + build_dependency_graph, + get_self_referencing_field, + sort_objects_by_self_reference, +) +from serdes.serializers import ExportSerializer + import structlog logger = structlog.get_logger(__name__) @@ -455,7 +479,7 @@ def graph(self, request): "value": "parent", } ) - meta = {"display_name": f"Assets Explorer"} + meta = {"display_name": "Assets Explorer"} return Response( {"nodes": nodes, "links": links, "categories": categories, "meta": meta} @@ -1802,6 +1826,7 @@ class FolderViewSet(BaseModelViewSet): model = Folder filterset_class = FolderFilter search_fields = ["ref_id"] + batch_size = 100 # Configurable batch size for processing domain import def perform_create(self, serializer): """ @@ -1964,6 +1989,754 @@ def my_assignments(self, request): } ) + @action(detail=True, methods=["get"]) + def export(self, request, pk): + include_attachments = True + instance = self.get_object() + + logger.info( + "Starting domain export", + domain_id=instance.id, + domain_name=instance.name, + include_attachments=include_attachments, + user=request.user.username, + ) + + objects = get_domain_export_objects(instance) + + logger.debug( + "Retrieved domain objects for export", + object_types=list(objects.keys()), + total_objects=sum(len(queryset) for queryset in objects.values()), + objects_per_model={ + model: len(queryset) for model, queryset in objects.items() + }, + ) + + # Create in-memory zip file + zip_buffer = io.BytesIO() + + with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zipf: + if include_attachments: + evidences = objects.get("evidence", Evidence.objects.none()).filter( + attachment__isnull=False + ) + logger.info( + "Processing evidence attachments", + total_evidences=evidences.count(), + domain_id=instance.id, + ) + + for evidence in evidences: + if evidence.attachment and default_storage.exists( + evidence.attachment.name + ): + # Read file directly into memory + with default_storage.open(evidence.attachment.name) as file: + file_content = file.read() + # Write the file content directly to the zip + zipf.writestr( + os.path.join( + "attachments", + os.path.basename(evidence.attachment.name), + ), + file_content, + ) + + # Add the JSON dump to the zip file + dumpfile_name = ( + f"ciso-assistant-{slugify(instance.name)}-domain-{timezone.now()}" + ) + dump_data = ExportSerializer.dump_data(scope=[*objects.values()]) + + logger.debug( + "Adding JSON dump to zip", + json_size=len(json.dumps(dump_data).encode("utf-8")), + filename=f"{dumpfile_name}.json", + ) + + zipf.writestr("data.json", json.dumps(dump_data).encode("utf-8")) + + # Reset buffer position to the start + zip_buffer.seek(0) + final_size = len(zip_buffer.getvalue()) + + # Create the response with the in-memory zip file + response = HttpResponse(zip_buffer.getvalue(), content_type="application/zip") + response["Content-Disposition"] = f'attachment; filename="{dumpfile_name}.zip"' + + logger.info( + "Domain export completed successfully", + domain_id=instance.id, + domain_name=instance.name, + zip_size=final_size, + filename=f"{dumpfile_name}.zip", + ) + + return response + + @action( + detail=False, + methods=["post"], + url_path="import", + parser_classes=(FileUploadParser,), + ) + def import_domain(self, request): + """Handle file upload and initiate import process.""" + try: + domain_name = request.headers.get( + "X-CISOAssistantDomainName", str(uuid.uuid4()) + ) + parsed_data = self._process_uploaded_file(request.data["file"]) + result = self._import_objects(parsed_data, domain_name) + return Response(result, status=status.HTTP_200_OK) + + except KeyError as e: + logger.error("No file provided in the request", exc_info=e) + return Response( + {"errors": ["No file provided"]}, status=status.HTTP_400_BAD_REQUEST + ) + + except json.JSONDecodeError as e: + logger.error("Invalid JSON format in uploaded file", exc_info=e) + return Response( + {"errors": ["Invalid JSON format"]}, status=status.HTTP_400_BAD_REQUEST + ) + + def _process_uploaded_file(self, dump_file: str | Path) -> Any: + """Process the uploaded file and return parsed data.""" + if not zipfile.is_zipfile(dump_file): + logger.error("Invalid ZIP file format") + raise ValidationError({"file": "invalidZipFileFormat"}) + + with zipfile.ZipFile(dump_file, mode="r") as zipf: + if "data.json" not in zipf.namelist(): + logger.error("No data.json file found in uploaded file") + raise ValidationError({"file": "noDataJsonFileFound"}) + infolist = zipf.infolist() + directories = list(set([Path(f.filename).parent.name for f in infolist])) + decompressed_data = zipf.read("data.json") + # Decode bytes to string if necessary + if isinstance(decompressed_data, bytes): + decompressed_data = decompressed_data.decode("utf-8") + try: + json_dump = json.loads(decompressed_data) + import_version = json_dump["meta"]["media_version"] + except json.JSONDecodeError as e: + logger.error("Invalid JSON format in uploaded file", exc_info=e) + raise + if not "objects" in json_dump: + raise ValidationError("badly formatted json") + if not import_version == VERSION: + logger.error( + f"Import version {import_version} not compatible with current version {VERSION}" + ) + raise ValidationError( + {"file": "importVersionNotCompatibleWithCurrentVersion"} + ) + if "attachments" in directories: + attachments = { + f for f in infolist if Path(f.filename).parent.name == "attachments" + } + logger.info( + "Attachments found in uploaded file", + attachments_count=len(attachments), + ) + for attachment in attachments: + try: + content = zipf.read(attachment) + current_name = Path(attachment.filename).name + new_name = default_storage.save( + current_name, io.BytesIO(content) + ) + if new_name != current_name: + for x in json_dump["objects"]: + if ( + x["model"] == "core.evidence" + and x["fields"]["attachment"] == current_name + ): + x["fields"]["attachment"] = new_name + + except Exception as e: + logger.error("Error extracting attachment", exc_info=e) + + return json_dump + + def _get_models_map(self, objects): + """Build a map of model names to model classes.""" + model_names = {obj["model"] for obj in objects} + return {name: apps.get_model(name) for name in model_names} + + def _resolve_dependencies(self, all_models): + """Resolve model dependencies and detect cycles.""" + logger.debug("Resolving model dependencies", all_models=all_models) + + graph = build_dependency_graph(all_models) + + logger.debug("Dependency graph", graph=graph) + + try: + return topological_sort(graph) + except ValueError as e: + logger.error("Cyclic dependency detected", error=str(e)) + raise ValidationError({"error": "Cyclic dependency detected"}) + + def _import_objects(self, parsed_data: dict, domain_name: str): + """ + Import and validate objects using appropriate serializers. + Handles both validation and creation in separate phases within a transaction. + """ + validation_errors = [] + required_libraries = [] + missing_libraries = [] + link_dump_database_ids = {} + try: + objects = parsed_data.get("objects", None) + if not objects: + logger.error("No objects found in the dump") + raise ValidationError({"error": "No objects found in the dump"}) + + # Validate models and check for domain + models_map = self._get_models_map(objects) + if Folder in models_map.values(): + logger.error("Dump contains a domain") + raise ValidationError({"error": "Dump contains a domain"}) + + # Validation phase (outside transaction since it doesn't modify database) + creation_order = self._resolve_dependencies(list(models_map.values())) + + logger.debug("Resolved creation order", creation_order=creation_order) + + logger.debug("Starting objects validation", objects_count=len(objects)) + + for model in creation_order: + self._validate_model_objects( + model=model, + objects=objects, + validation_errors=validation_errors, + required_libraries=required_libraries, + ) + + logger.debug("required_libraries", required_libraries=required_libraries) + + if validation_errors: + logger.error( + "Failed to validate objets", validation_errors=validation_errors + ) + raise ValidationError({"validation_errors": validation_errors}) + + # Check for missing libraries + for library in required_libraries: + if not LoadedLibrary.objects.filter(urn=library).exists(): + missing_libraries.append(library) + + logger.debug("missing_libraries", missing_libraries=missing_libraries) + + # Creation phase - wrap in transaction + with transaction.atomic(): + # Create base folder and store its ID + base_folder = Folder.objects.create( + name=domain_name, content_type=Folder.ContentType.DOMAIN + ) + link_dump_database_ids["base_folder"] = base_folder + + logger.info( + "Starting objects creation", + objects_count=len(objects), + creation_order=creation_order, + ) + # Create all objects within the transaction + for model in creation_order: + self._create_model_objects( + model=model, + objects=objects, + link_dump_database_ids=link_dump_database_ids, + ) + + return {"message": "Import successful"} + + except ValidationError as e: + if missing_libraries: + logger.warning(f"Missing libraries: {missing_libraries}") + raise ValidationError({"missing_libraries": missing_libraries}) + logger.exception(f"Failed to import objects: {str(e)}") + raise ValidationError({"non_field_errors": "errorOccuredDuringImport"}) + + def _validate_model_objects( + self, model, objects, validation_errors, required_libraries + ): + """Validate all objects for a model before creation.""" + model_name = f"{model._meta.app_label}.{model._meta.model_name}" + model_objects = [obj for obj in objects if obj["model"] == model_name] + + if not model_objects: + return + + # Process validation in batches + for i in range(0, len(model_objects), self.batch_size): + batch = model_objects[i : i + self.batch_size] + self._validate_batch( + model=model, + batch=batch, + validation_errors=validation_errors, + required_libraries=required_libraries, + ) + + def _validate_batch(self, model, batch, validation_errors, required_libraries): + """Validate a batch of objects.""" + model_name = f"{model._meta.app_label}.{model._meta.model_name}" + + for obj in batch: + obj_id = obj.get("id") + fields = obj.get("fields", {}).copy() + + try: + # Handle library objects + if model == LoadedLibrary: + continue + if fields.get("library"): + required_libraries.append(fields["library"]) + logger.info( + "Adding library to required libraries", urn=fields["library"] + ) + continue + + # Validate using serializer + SerializerClass = import_export_serializer_class(model) + serializer = SerializerClass(data=fields) + + if not serializer.is_valid(): + validation_errors.append( + { + "model": model_name, + "id": obj_id, + "errors": serializer.errors, + } + ) + + except Exception as e: + logger.error( + f"Error validating object {obj_id} in {model_name}: {str(e)}", + exc_info=e, + ) + validation_errors.append( + { + "model": model_name, + "id": obj_id, + "errors": [str(e)], + } + ) + + def _create_model_objects(self, model, objects, link_dump_database_ids): + """Create all objects for a model after validation.""" + logger.debug("Creating objects for model", model=model) + + model_name = f"{model._meta.app_label}.{model._meta.model_name}" + model_objects = [obj for obj in objects if obj["model"] == model_name] + + logger.debug("Model objects", model=model, count=len(model_objects)) + + if not model_objects: + return + + # Handle self-referencing dependencies + self_ref_field = get_self_referencing_field(model) + if self_ref_field: + try: + model_objects = sort_objects_by_self_reference( + model_objects, self_ref_field + ) + except ValueError as e: + logger.error(f"Cyclic dependency detected in {model_name}: {str(e)}") + raise ValidationError( + {"error": f"Cyclic dependency detected in {model_name}"} + ) + + # Process creation in batches + for i in range(0, len(model_objects), self.batch_size): + batch = model_objects[i : i + self.batch_size] + self._create_batch( + model=model, + batch=batch, + link_dump_database_ids=link_dump_database_ids, + ) + + def _create_batch(self, model, batch, link_dump_database_ids): + """Create a batch of objects with proper relationship handling.""" + # Create all objects in the batch within a single transaction + with transaction.atomic(): + for obj in batch: + obj_id = obj.get("id") + fields = obj.get("fields", {}).copy() + + try: + # Handle library objects + if fields.get("library") or model == LoadedLibrary: + logger.info(f"Skipping creation of library object {obj_id}") + link_dump_database_ids[obj_id] = fields.get("urn") + continue + + # Handle folder reference + if fields.get("folder"): + fields["folder"] = link_dump_database_ids.get("base_folder") + + # Process model-specific relationships + many_to_many_map_ids = {} + fields = self._process_model_relationships( + model=model, + fields=fields, + link_dump_database_ids=link_dump_database_ids, + many_to_many_map_ids=many_to_many_map_ids, + ) + + try: + # Run clean to validate unique constraints + model(**fields).clean() + except ValidationError as e: + for field, error in e.error_dict.items(): + fields[field] = f"{fields[field]} {uuid.uuid4()}" + + logger.debug("Creating object", fields=fields) + + # Create the object + obj_created = model.objects.create(**fields) + link_dump_database_ids[obj_id] = obj_created.id + + # Handle many-to-many relationships + self._set_many_to_many_relations( + model=model, + obj=obj_created, + many_to_many_map_ids=many_to_many_map_ids, + ) + + except Exception as e: + logger.error(f"Error creating object {obj_id}: {str(e)}") + # This will trigger a rollback of the entire batch + raise ValidationError( + f"Error creating {model._meta.model_name}: {str(e)}" + ) + + def _process_model_relationships( + self, + model, + fields, + link_dump_database_ids, + many_to_many_map_ids, + ): + """Process model-specific relationships.""" + + def get_mapped_ids( + ids: List[str], link_dump_database_ids: Dict[str, str] + ) -> List[str]: + return [link_dump_database_ids.get(id, "") for id in ids] + + model_name = model._meta.model_name + _fields = fields.copy() + + logger.debug( + "Processing model relationships", model=model_name, _fields=_fields + ) + + match model_name: + case "asset": + many_to_many_map_ids["parent_ids"] = get_mapped_ids( + _fields.pop("parent_assets", []), link_dump_database_ids + ) + + case "riskassessment": + _fields["project"] = Project.objects.get( + id=link_dump_database_ids.get(_fields["project"]) + ) + _fields["risk_matrix"] = RiskMatrix.objects.get( + urn=_fields.get("risk_matrix") + ) + _fields["ebios_rm_study"] = ( + EbiosRMStudy.objects.get( + id=link_dump_database_ids.get(_fields["ebios_rm_study"]) + ) + if _fields.get("ebios_rm_study") + else None + ) + + case "complianceassessment": + _fields["project"] = Project.objects.get( + id=link_dump_database_ids.get(_fields["project"]) + ) + _fields["framework"] = Framework.objects.get(urn=_fields["framework"]) + + case "appliedcontrol": + many_to_many_map_ids["evidence_ids"] = get_mapped_ids( + _fields.pop("evidences", []), link_dump_database_ids + ) + ref_control_id = link_dump_database_ids.get( + _fields["reference_control"] + ) + _fields["reference_control"] = ReferenceControl.objects.filter( + urn=ref_control_id + ).first() + + case "evidence": + _fields.pop("size", None) + _fields.pop("attachment_hash", None) + + case "requirementassessment": + _fields["requirement"] = RequirementNode.objects.get( + urn=_fields.get("requirement") + ) + _fields["compliance_assessment"] = ComplianceAssessment.objects.get( + id=link_dump_database_ids.get(_fields["compliance_assessment"]) + ) + many_to_many_map_ids.update( + { + "applied_controls": get_mapped_ids( + _fields.pop("applied_controls", []), link_dump_database_ids + ), + "evidence_ids": get_mapped_ids( + _fields.pop("evidences", []), link_dump_database_ids + ), + } + ) + + case "vulnerability": + many_to_many_map_ids["applied_controls"] = get_mapped_ids( + _fields.pop("applied_controls", []), link_dump_database_ids + ) + + case "riskscenario": + _fields["risk_assessment"] = RiskAssessment.objects.get( + id=link_dump_database_ids.get(_fields["risk_assessment"]) + ) + # Process all related _fields at once + related__fields = [ + "threats", + "vulnerabilities", + "assets", + "applied_controls", + "existing_applied_controls", + ] + for field in related__fields: + map_key = ( + f"{field.rstrip('s')}_ids" + if not field.endswith("controls") + else f"{field}_ids" + ) + many_to_many_map_ids[map_key] = get_mapped_ids( + _fields.pop(field, []), link_dump_database_ids + ) + + case "entity": + _fields.pop("owned_folders", None) + + case "ebiosrmstudy": + _fields.update( + { + "risk_matrix": RiskMatrix.objects.get( + urn=_fields.get("risk_matrix") + ), + "reference_entity": Entity.objects.get( + id=link_dump_database_ids.get(_fields["reference_entity"]) + ), + } + ) + many_to_many_map_ids.update( + { + "asset_ids": get_mapped_ids( + _fields.pop("assets", []), link_dump_database_ids + ), + "compliance_assessment_ids": get_mapped_ids( + _fields.pop("compliance_assessments", []), + link_dump_database_ids, + ), + } + ) + + case "fearedevent": + _fields["ebios_rm_study"] = EbiosRMStudy.objects.get( + id=link_dump_database_ids.get(_fields["ebios_rm_study"]) + ) + many_to_many_map_ids.update( + { + "qualifications_urn": get_mapped_ids( + _fields.pop("qualifications", []), link_dump_database_ids + ), + "asset_ids": get_mapped_ids( + _fields.pop("assets", []), link_dump_database_ids + ), + } + ) + + case "roto": + _fields["ebios_rm_study"] = EbiosRMStudy.objects.get( + id=link_dump_database_ids.get(_fields["ebios_rm_study"]) + ) + many_to_many_map_ids["feared_event_ids"] = get_mapped_ids( + _fields.pop("feared_events", []), link_dump_database_ids + ) + + case "stakeholder": + _fields.update( + { + "ebios_rm_study": EbiosRMStudy.objects.get( + id=link_dump_database_ids.get(_fields["ebios_rm_study"]) + ), + "entity": Entity.objects.get( + id=link_dump_database_ids.get(_fields["entity"]) + ), + } + ) + many_to_many_map_ids["applied_controls"] = get_mapped_ids( + _fields.pop("applied_controls", []), link_dump_database_ids + ) + + case "strategicscenario": + _fields.update( + { + "ebios_rm_study": EbiosRMStudy.objects.get( + id=link_dump_database_ids.get(_fields["ebios_rm_study"]) + ), + "ro_to_couple": RoTo.objects.get( + id=link_dump_database_ids.get(_fields["ro_to_couple"]) + ), + } + ) + + case "attackpath": + _fields.update( + { + "ebios_rm_study": EbiosRMStudy.objects.get( + id=link_dump_database_ids.get(_fields["ebios_rm_study"]) + ), + "strategic_scenario": StrategicScenario.objects.get( + id=link_dump_database_ids.get(_fields["strategic_scenario"]) + ), + } + ) + many_to_many_map_ids["stakeholder_ids"] = get_mapped_ids( + _fields.pop("stakeholders", []), link_dump_database_ids + ) + + case "operationalscenario": + _fields.update( + { + "ebios_rm_study": EbiosRMStudy.objects.get( + id=link_dump_database_ids.get(_fields["ebios_rm_study"]) + ), + "attack_path": AttackPath.objects.get( + id=link_dump_database_ids.get(_fields["attack_path"]) + ), + } + ) + many_to_many_map_ids["threat_ids"] = get_mapped_ids( + _fields.pop("threats", []), link_dump_database_ids + ) + + return _fields + + def _set_many_to_many_relations(self, model, obj, many_to_many_map_ids): + """Set many-to-many relationships after object creation.""" + model_name = model._meta.model_name + + match model_name: + case "asset": + if parent_ids := many_to_many_map_ids.get("parent_ids"): + obj.parent_assets.set(Asset.objects.filter(id__in=parent_ids)) + + case "appliedcontrol": + if evidence_ids := many_to_many_map_ids.get("evidence_ids"): + obj.evidences.set(Evidence.objects.filter(id__in=evidence_ids)) + + case "requirementassessment": + if applied_control_ids := many_to_many_map_ids.get("applied_controls"): + obj.applied_controls.set( + AppliedControl.objects.filter(id__in=applied_control_ids) + ) + if evidence_ids := many_to_many_map_ids.get("evidence_ids"): + obj.evidences.set(Evidence.objects.filter(id__in=evidence_ids)) + + case "vulnerability": + if applied_control_ids := many_to_many_map_ids.get("applied_controls"): + obj.applied_controls.set( + AppliedControl.objects.filter(id__in=applied_control_ids) + ) + + case "riskscenario": + if threat_ids := many_to_many_map_ids.get("threat_ids"): + uuids, urns = self._split_uuids_urns(threat_ids) + obj.threats.set( + Threat.objects.filter(Q(id__in=uuids) | Q(urn__in=urns)) + ) + + for field, model_class in { + "vulnerability_ids": (Vulnerability, "vulnerabilities"), + "asset_ids": (Asset, "assets"), + "applied_control_ids": (AppliedControl, "applied_controls"), + "existing_applied_control_ids": ( + AppliedControl, + "existing_applied_controls", + ), + }.items(): + if ids := many_to_many_map_ids.get(field): + getattr(obj, model_class[1]).set( + model_class[0].objects.filter(id__in=ids) + ) + + case "ebiosrmstudy": + if asset_ids := many_to_many_map_ids.get("asset_ids"): + obj.assets.set(Asset.objects.filter(id__in=asset_ids)) + if compliance_assessment_ids := many_to_many_map_ids.get( + "compliance_assessment_ids" + ): + obj.compliance_assessments.set( + ComplianceAssessment.objects.filter( + id__in=compliance_assessment_ids + ) + ) + + case "fearedevent": + if qualifications_urn := many_to_many_map_ids.get("qualifications_urn"): + obj.qualifications.set( + Qualification.objects.filter(urn__in=qualifications_urn) + ) + if asset_ids := many_to_many_map_ids.get("asset_ids"): + obj.assets.set(Asset.objects.filter(id__in=asset_ids)) + + case "roto": + if feared_event_ids := many_to_many_map_ids.get("feared_event_ids"): + obj.feared_events.set( + FearedEvent.objects.filter(id__in=feared_event_ids) + ) + + case "stakeholder": + if applied_control_ids := many_to_many_map_ids.get("applied_controls"): + obj.applied_controls.set( + AppliedControl.objects.filter(id__in=applied_control_ids) + ) + + case "attackpath": + if stakeholder_ids := many_to_many_map_ids.get("stakeholder_ids"): + obj.stakeholders.set( + Stakeholder.objects.filter(id__in=stakeholder_ids) + ) + + case "operationalscenario": + if threat_ids := many_to_many_map_ids.get("threat_ids"): + uuids, urns = self._split_uuids_urns(threat_ids) + obj.threats.set( + Threat.objects.filter(Q(id__in=uuids) | Q(urn__in=urns)) + ) + + def _split_uuids_urns(self, ids: List[str]) -> Tuple[List[str], List[str]]: + """Split a list of strings into UUIDs and URNs.""" + uuids = [] + urns = [] + for item in ids: + try: + uuid = UUID(str(item)) + uuids.append(uuid) + except ValueError: + urns.append(item) + return uuids, urns + class UserPreferencesView(APIView): permission_classes = [permissions.IsAuthenticated] @@ -2643,19 +3416,14 @@ def export(self, request, pk): with zipfile.ZipFile(zip_name, "w") as zipf: for evidence in evidences: if evidence.attachment: - with tempfile.NamedTemporaryFile(delete=True) as tmp: - # Download the attachment to the temporary file - if default_storage.exists(evidence.attachment.name): - file = default_storage.open(evidence.attachment.name) - tmp.write(file.read()) - tmp.flush() - zipf.write( - tmp.name, - os.path.join( - "evidences", - os.path.basename(evidence.attachment.name), - ), - ) + if default_storage.exists(evidence.attachment.name): + zipf.writestr( + os.path.join( + "evidences", + os.path.basename(evidence.attachment.name), + ), + default_storage.open(evidence.attachment.name).read(), + ) zipf.writestr("index.html", index_content) response = FileResponse(open(zip_name, "rb"), as_attachment=True) diff --git a/backend/ebios_rm/serializers.py b/backend/ebios_rm/serializers.py index 65d9d3610..c9d6ba516 100644 --- a/backend/ebios_rm/serializers.py +++ b/backend/ebios_rm/serializers.py @@ -1,7 +1,7 @@ from core.serializers import ( BaseModelSerializer, - FieldsRelatedField, ) +from core.serializer_fields import FieldsRelatedField, HashSlugRelatedField from core.models import RiskMatrix from .models import ( EbiosRMStudy, @@ -49,6 +49,38 @@ class Meta: fields = "__all__" +class EbiosRMStudyImportExportSerializer(BaseModelSerializer): + risk_matrix = serializers.SlugRelatedField(slug_field="urn", read_only=True) + + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + assets = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + compliance_assessments = HashSlugRelatedField( + slug_field="pk", read_only=True, many=True + ) + reference_entity = HashSlugRelatedField(slug_field="pk", read_only=True) + + class Meta: + model = EbiosRMStudy + fields = [ + "ref_id", + "name", + "description", + "eta", + "due_date", + "version", + "status", + "observation", + "meta", + "assets", + "compliance_assessments", + "folder", + "risk_matrix", + "reference_entity", + "created_at", + "updated_at", + ] + + class FearedEventWriteSerializer(BaseModelSerializer): class Meta: model = FearedEvent @@ -67,6 +99,33 @@ class Meta: fields = "__all__" +class FearedEventImportExportSerializer(BaseModelSerializer): + qualifications = serializers.SlugRelatedField( + slug_field="urn", many=True, read_only=True + ) + + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + ebios_rm_study = HashSlugRelatedField(slug_field="pk", read_only=True) + assets = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + + class Meta: + model = FearedEvent + fields = [ + "ref_id", + "name", + "description", + "gravity", + "is_selected", + "justification", + "ebios_rm_study", + "qualifications", + "assets", + "folder", + "created_at", + "updated_at", + ] + + class RoToWriteSerializer(BaseModelSerializer): class Meta: model = RoTo @@ -89,6 +148,29 @@ class Meta: fields = "__all__" +class RoToImportExportSerializer(BaseModelSerializer): + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + ebios_rm_study = HashSlugRelatedField(slug_field="pk", read_only=True) + feared_events = HashSlugRelatedField(slug_field="pk", many=True, read_only=True) + + class Meta: + model = RoTo + fields = [ + "risk_origin", + "target_objective", + "motivation", + "resources", + "activity", + "is_selected", + "justification", + "ebios_rm_study", + "feared_events", + "folder", + "created_at", + "updated_at", + ] + + class StakeholderWriteSerializer(BaseModelSerializer): current_criticality = serializers.IntegerField(read_only=True) residual_criticality = serializers.IntegerField(read_only=True) @@ -125,6 +207,34 @@ class Meta: fields = "__all__" +class StakeholderImportExportSerializer(BaseModelSerializer): + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + ebios_rm_study = HashSlugRelatedField(slug_field="pk", read_only=True) + entity = HashSlugRelatedField(slug_field="pk", read_only=True) + applied_controls = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + + class Meta: + model = Stakeholder + fields = [ + "created_at", + "updated_at", + "folder", + "ebios_rm_study", + "entity", + "category", + "current_dependency", + "current_penetration", + "current_maturity", + "current_trust", + "residual_dependency", + "residual_penetration", + "residual_maturity", + "residual_trust", + "is_selected", + "applied_controls", + ] + + class StrategicScenarioWriteSerializer(BaseModelSerializer): class Meta: model = StrategicScenario @@ -143,6 +253,25 @@ class Meta: fields = "__all__" +class StrategicScenarioImportExportSerializer(BaseModelSerializer): + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + ebios_rm_study = HashSlugRelatedField(slug_field="pk", read_only=True) + ro_to_couple = HashSlugRelatedField(slug_field="pk", read_only=True) + + class Meta: + model = StrategicScenario + fields = [ + "ref_id", + "name", + "description", + "ebios_rm_study", + "ro_to_couple", + "folder", + "created_at", + "updated_at", + ] + + class AttackPathWriteSerializer(BaseModelSerializer): class Meta: model = AttackPath @@ -162,6 +291,29 @@ class Meta: fields = "__all__" +class AttackPathImportExportSerializer(BaseModelSerializer): + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + ebios_rm_study = HashSlugRelatedField(slug_field="pk", read_only=True) + strategic_scenario = HashSlugRelatedField(slug_field="pk", read_only=True) + stakeholders = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + + class Meta: + model = AttackPath + fields = [ + "ref_id", + "name", + "description", + "ebios_rm_study", + "strategic_scenario", + "stakeholders", + "folder", + "is_selected", + "justification", + "created_at", + "updated_at", + ] + + class OperationalScenarioWriteSerializer(BaseModelSerializer): class Meta: model = OperationalScenario @@ -184,3 +336,25 @@ class OperationalScenarioReadSerializer(BaseModelSerializer): class Meta: model = OperationalScenario fields = "__all__" + + +class OperationalScenarioImportExportSerializer(BaseModelSerializer): + ebios_rm_study = HashSlugRelatedField(slug_field="pk", read_only=True) + attack_path = HashSlugRelatedField(slug_field="pk", read_only=True) + threats = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + + class Meta: + model = OperationalScenario + fields = [ + "operating_modes_description", + "likelihood", + "is_selected", + "justification", + "ebios_rm_study", + "attack_path", + "threats", + "folder", + "created_at", + "updated_at", + ] diff --git a/backend/library/serializers.py b/backend/library/serializers.py index 8c6ff0294..a3e91fc59 100644 --- a/backend/library/serializers.py +++ b/backend/library/serializers.py @@ -1,7 +1,7 @@ from core.models import StoredLibrary, LoadedLibrary from rest_framework import serializers -from core.serializers import ReferentialSerializer -from core.serializer_fields import FieldsRelatedField +from core.serializers import ReferentialSerializer, BaseModelSerializer +from core.serializer_fields import FieldsRelatedField, HashSlugRelatedField """class LibraryObjectSerializer(serializers.Serializer): type = serializers.ChoiceField( @@ -48,6 +48,34 @@ class Meta: exclude = ["translations"] +class LoadedLibraryImportExportSerializer(BaseModelSerializer): + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + dependencies = HashSlugRelatedField(slug_field="pk", read_only=True, many=True) + + class Meta: + model = LoadedLibrary + fields = [ + "created_at", + "updated_at", + "version", + "folder", + "urn", + "ref_id", + "provider", + "name", + "description", + "annotation", + "locale", + "packager", + "publication_date", + "builtin", + "objects_meta", + "translations", + "dependencies", + "copyright", + ] + + class LoadedLibraryDetailedSerializer(ReferentialSerializer): locales = serializers.ListField(source="get_locales", read_only=True) dependencies = FieldsRelatedField(many=True, fields=["urn", "str", "name"]) diff --git a/backend/serdes/serializers.py b/backend/serdes/serializers.py index c96af0977..e36a0b23e 100644 --- a/backend/serdes/serializers.py +++ b/backend/serdes/serializers.py @@ -1,6 +1,21 @@ -from rest_framework import serializers +""" +Serializers for handling data import and export operations. + +This module provides a set of serializers for managing data backup and restoration, +with support for versioning and field validation. It handles the serialization +of Django model instances to a portable format and back. +""" + import re +from django.utils import timezone +from django.conf import settings +from django.db.models.query import QuerySet +from rest_framework import serializers +from hashlib import sha256 + +from .utils import app_dot_model, import_export_serializer_class + class LoadBackupSerializer(serializers.Serializer): file = serializers.Field @@ -10,11 +25,38 @@ class Meta: class MetaSerializer(serializers.Serializer): + """ + Serializer for backup metadata information. + + Handles the metadata section of backups, including version information + and timestamp data. + + Attributes: + media_version (str): Version of CISO Assistant at the time the backup was created. + exported_at (str): ISO 8601 format timestamp indicating when the backup was created. + """ + media_version = serializers.CharField() exported_at = serializers.CharField() class ObjectSerializer(serializers.Serializer): + """ + Serializer for individual model instances in the backup. + + Handles the serialization of individual model instances, including their + identifiers and field data. Ensures field names follow the required pattern. + + Attributes: + model (str): Dotted path string identifying the Django model (e.g., 'app.Model'). + id (str): String representation of the instance's primary key or its backup local identifier. + fields (dict): Dictionary containing the instance's field values. + + Raises: + ValidationError: If any field name doesn't match the required pattern of + lowercase letters and underscores. + """ + model = serializers.CharField() id = serializers.CharField() fields = serializers.DictField( @@ -37,3 +79,59 @@ def validate_fields(self, value): class ExportSerializer(serializers.Serializer): meta = MetaSerializer() objects = ObjectSerializer(many=True) + + @staticmethod + def dump_data(scope: list[QuerySet]) -> dict: + """ + Serialize multiple querysets into a complete backup format. + + This method creates a full backup representation including metadata + and serialized model instances from multiple querysets. + + Args: + scope (list[QuerySet]): List of querysets to be included in the backup. + + Returns: + dict: A dictionary containing: + - meta: Dictionary with version and timestamp information + - objects: List of serialized model instances + + Example: + >>> querysets = [User.objects.all(), UserGroup.objects.all()] + >>> data = ExportSerializer.dump_data(querysets) + >>> { + ... 'meta': { + ... 'media_version': '1.0', + ... 'exported_at': '2025-01-14T10:00:00Z' + ... }, + ... 'objects': [ + ... { + ... 'model': 'iam.user', + ... 'id': '1', + ... 'fields': {...} + ... }, + ... ... + ... ] + ... } + """ + + meta = { + "media_version": settings.VERSION, + "exported_at": timezone.now().isoformat(), + } + + objects = [] + + for queryset in scope: + for obj in queryset: + objects.append( + { + "model": app_dot_model(queryset.model), + "id": sha256(str(obj.id).encode()).hexdigest()[:12], + "fields": import_export_serializer_class(queryset.model)( + obj + ).data, + } + ) + + return {"meta": meta, "objects": objects} diff --git a/backend/serdes/utils.py b/backend/serdes/utils.py index 29f0935b0..609c9f733 100644 --- a/backend/serdes/utils.py +++ b/backend/serdes/utils.py @@ -1,11 +1,140 @@ from typing import Iterable import django.apps -from django.core import serializers -from django.db.models import Model +from django.contrib.contenttypes.models import ContentType +from rest_framework import serializers +from django.db.models import Model, Q from django.db.models.deletion import Collector +from collections import defaultdict from iam.models import Folder +from rest_framework.exceptions import ValidationError +from typing import List, Type, Set, Dict, Optional + +from core.models import ( + Asset, + AppliedControl, + Evidence, + Framework, + Project, + RiskAssessment, + RiskMatrix, + RiskScenario, + ComplianceAssessment, + RequirementAssessment, + Vulnerability, + Threat, + ReferenceControl, + LoadedLibrary, +) + +from ebios_rm.models import ( + EbiosRMStudy, + FearedEvent, + RoTo, + OperationalScenario, + Stakeholder, + StrategicScenario, + AttackPath, +) + +from tprm.models import Entity + +from core.serializers import ( + FolderImportExportSerializer, + AssetImportExportSerializer, + AppliedControlImportExportSerializer, + EvidenceImportExportSerializer, + ProjectImportExportSerializer, + RiskAssessmentImportExportSerializer, + RiskScenarioImportExportSerializer, + ComplianceAssessmentImportExportSerializer, + RequirementAssessmentImportExportSerializer, + VulnerabilityImportExportSerializer, + ThreatImportExportSerializer, + ReferenceControlImportExportSerializer, + FrameworkImportExportSerializer, + RiskMatrixImportExportSerializer, +) + +from ebios_rm.serializers import ( + EbiosRMStudyImportExportSerializer, + FearedEventImportExportSerializer, + RoToImportExportSerializer, + OperationalScenarioImportExportSerializer, + StakeholderImportExportSerializer, + StrategicScenarioImportExportSerializer, + AttackPathImportExportSerializer, +) + +from tprm.serializers import EntityImportExportSerializer + +from django.db import models +from library.serializers import LoadedLibraryImportExportSerializer + +from core.models import ( + Asset, + AppliedControl, + Evidence, + Framework, + Project, + RiskAssessment, + RiskMatrix, + RiskScenario, + ComplianceAssessment, + RequirementAssessment, + Vulnerability, + Threat, + ReferenceControl, + LoadedLibrary, +) + +from ebios_rm.models import ( + EbiosRMStudy, + FearedEvent, + RoTo, + OperationalScenario, + Stakeholder, + StrategicScenario, + AttackPath, +) + +from tprm.models import Entity + +from core.serializers import ( + FolderImportExportSerializer, + AssetImportExportSerializer, + AppliedControlImportExportSerializer, + EvidenceImportExportSerializer, + ProjectImportExportSerializer, + RiskAssessmentImportExportSerializer, + RiskScenarioImportExportSerializer, + ComplianceAssessmentImportExportSerializer, + RequirementAssessmentImportExportSerializer, + VulnerabilityImportExportSerializer, + ThreatImportExportSerializer, + ReferenceControlImportExportSerializer, + FrameworkImportExportSerializer, + RiskMatrixImportExportSerializer, +) + +from ebios_rm.serializers import ( + EbiosRMStudyImportExportSerializer, + FearedEventImportExportSerializer, + RoToImportExportSerializer, + OperationalScenarioImportExportSerializer, + StakeholderImportExportSerializer, + StrategicScenarioImportExportSerializer, + AttackPathImportExportSerializer, +) + +from tprm.serializers import EntityImportExportSerializer + +from library.serializers import LoadedLibraryImportExportSerializer + +import structlog + +logger = structlog.get_logger(__name__) def get_all_objects(): @@ -46,34 +175,6 @@ def dump_objects(queryset: Iterable[Model], path: str, format: str = "json"): return path -def get_objects_from_folder(folder: Folder) -> set: - """ - Collates all objects in a folder. - - Parameters: - ----------- - folder: Folder - Folder to get objects from. - - Returns: - -------- - objects: list - List of objects in the folder. - """ - objects = set() - # NOTE: This is a hack to get all objects in a folder. - # As all objects contained in a folder are deleted - # when the folder is deleted, we can use the Django - # deletion collector to get all the objects in a folder. - collector = Collector(using="default") - collector.collect([folder]) - - for model, model_instances in collector.data.items(): - objects.update(model_instances) - - return objects - - def restore_objects(path: str, format: str = "json"): """ Restore objects from a file. @@ -91,3 +192,360 @@ def restore_objects(path: str, format: str = "json"): for obj in objects: obj.save() return path + + +def app_dot_model(model: Model) -> str: + """ + Get the app label and model name of a model. + e.g. 'app_label.model_name' + """ + content_type = ContentType.objects.get_for_model(model) + return f"{content_type.app_label}.{content_type.model}" + + +def import_export_serializer_class(model: Model) -> serializers.Serializer: + model_serializer_map = { + Folder: FolderImportExportSerializer, + Asset: AssetImportExportSerializer, + AppliedControl: AppliedControlImportExportSerializer, + Evidence: EvidenceImportExportSerializer, + Project: ProjectImportExportSerializer, + RiskAssessment: RiskAssessmentImportExportSerializer, + RiskScenario: RiskScenarioImportExportSerializer, + ComplianceAssessment: ComplianceAssessmentImportExportSerializer, + RequirementAssessment: RequirementAssessmentImportExportSerializer, + Vulnerability: VulnerabilityImportExportSerializer, + Threat: ThreatImportExportSerializer, + ReferenceControl: ReferenceControlImportExportSerializer, + EbiosRMStudy: EbiosRMStudyImportExportSerializer, + FearedEvent: FearedEventImportExportSerializer, + RoTo: RoToImportExportSerializer, + OperationalScenario: OperationalScenarioImportExportSerializer, + Stakeholder: StakeholderImportExportSerializer, + Entity: EntityImportExportSerializer, + StrategicScenario: StrategicScenarioImportExportSerializer, + AttackPath: AttackPathImportExportSerializer, + Framework: FrameworkImportExportSerializer, + RiskMatrix: RiskMatrixImportExportSerializer, + LoadedLibrary: LoadedLibraryImportExportSerializer, + } + + return model_serializer_map.get(model, None) + + +def get_model_dependencies( + model: Type[models.Model], all_models: Set[Type[models.Model]] +) -> List[Type[models.Model]]: + """ + Get all required dependencies for a model. + + Args: + model: The model to analyze + all_models: Set of models to consider as valid dependencies + + Returns: + List of model classes that this model depends on + """ + dependencies = [] + + logger.debug("Getting model dependencies", model=model) + for field in model._meta.get_fields(): + if not field.is_relation or field.related_model not in all_models: + continue + + # Check if the relationship is required + is_required = ( + isinstance(field, (models.ForeignKey, models.OneToOneField)) + ) or isinstance(field, models.ManyToManyField) + + if is_required: + dependencies.append(field.related_model) + + return dependencies + + +def build_dependency_graph( + models: List[Type[models.Model]], +) -> Dict[Type[models.Model], List[Type[models.Model]]]: + """ + Build a dependency graph from a list of models. + + Args: + models: List of model classes to analyze + + Returns: + Dictionary mapping models to their dependencies + """ + models_set = set(models + [Folder]) + graph = defaultdict(list) + + logger.debug("Building dependency graph", models=models) + + for model in models: + dependencies = get_model_dependencies(model, models_set) + + logger.debug("Model dependencies", model=model, dependencies=dependencies) + + if dependencies: + graph[model].extend(dependencies) + + logger.debug("Dependency graph", graph=graph) + return graph + + +def topological_sort( + graph: Dict[Type[models.Model], List[Type[models.Model]]], +) -> List[Type[models.Model]]: + """ + Perform a topological sort with cycle detection. + + Args: + graph: Dependency graph + + Returns: + List of models in dependency order + + Raises: + ValidationError: If a dependency cycle is detected + """ + result = [] + permanent_marks = set() + temporary_marks = set() + + def visit(node): + if node in temporary_marks: + cycle_path = " -> ".join(m.__name__ for m in temporary_marks) + raise ValidationError(f"Circular dependency detected: {cycle_path}") + + if node not in permanent_marks: + temporary_marks.add(node) + + for neighbor in graph.get(node, []): + if neighbor == node: + continue + visit(neighbor) + + temporary_marks.remove(node) + permanent_marks.add(node) + result.append(node) + + for node in graph: + if node not in permanent_marks: + visit(node) + + return result + + +def get_self_referencing_field(model: Type[models.Model]) -> Optional[str]: + """ + Find self-referencing field in a model. + + Args: + model: Model class to analyze + + Returns: + Field name if found, None otherwise + """ + return next( + ( + field.field.name + for field in model._meta.get_fields() + if field.related_model == model + ), + None, + ) + + +def sort_objects_by_self_reference( + objects: List[dict], self_ref_field: str +) -> List[dict]: + """ + Sort objects by their hierarchical relationship when the self-referencing field + can contain multiple parent IDs. Missing parents are ignored. + + Args: + objects: List of object dictionaries. + self_ref_field: Name of the self-referencing field, which may contain a single + parent ID or a list of parent IDs. + + Returns: + List[dict]: Sorted list of objects. + + Raises: + ValidationError: If circular references are detected. + """ + object_map = {obj["id"]: obj for obj in objects} + + # Build dependency graph + graph = defaultdict(list) + roots = set(object_map.keys()) + + for obj in objects: + parent_ids = obj["fields"].get(self_ref_field, []) + if isinstance(parent_ids, str) or isinstance(parent_ids, int): + parent_ids = [parent_ids] # Ensure it's a list + + for parent_id in parent_ids: + if parent_id in object_map: + graph[parent_id].append(obj["id"]) + roots.discard(obj["id"]) # Remove this ID from root candidates + + # Sort with cycle detection + sorted_ids = [] + visited = set() + temp_visited = set() + + def visit(obj_id): + if obj_id in temp_visited: + path = " -> ".join(str(id) for id in temp_visited) + raise ValidationError(f"Circular reference detected: {path}") + + if obj_id not in visited: + temp_visited.add(obj_id) + + for child_id in graph.get(obj_id, []): + visit(child_id) + + temp_visited.remove(obj_id) + visited.add(obj_id) + sorted_ids.append(obj_id) + + # Process from roots + for root_id in roots: + visit(root_id) + + # Ensure all objects were processed + if len(visited) != len(objects): + raise ValidationError("Detected objects unreachable from root") + + return [object_map[obj_id] for obj_id in reversed(sorted_ids)] + + +def get_domain_export_objects(domain: Folder): + folders = ( + Folder.objects.filter( + Q(id=domain.id) | Q(id__in=[f.id for f in domain.get_sub_folders()]) + ) + .filter(content_type=Folder.ContentType.DOMAIN) + .distinct() + ) + projects = Project.objects.filter(folder__in=folders).distinct() + + risk_assessments = RiskAssessment.objects.filter( + Q(project__in=projects) | Q(folder__in=folders) + ).distinct() + risk_scenarios = RiskScenario.objects.filter( + risk_assessment__in=risk_assessments + ).distinct() + + ebios_rm_studies = EbiosRMStudy.objects.filter(folder__in=folders).distinct() + feared_events = FearedEvent.objects.filter( + ebios_rm_study__in=ebios_rm_studies + ).distinct() + ro_tos = RoTo.objects.filter(ebios_rm_study__in=ebios_rm_studies).distinct() + strategic_scenarios = StrategicScenario.objects.filter( + ebios_rm_study__in=ebios_rm_studies + ).distinct() + attack_paths = AttackPath.objects.filter( + ebios_rm_study__in=ebios_rm_studies + ).distinct() + operational_scenarios = OperationalScenario.objects.filter( + ebios_rm_study__in=ebios_rm_studies + ).distinct() + stakeholders = Stakeholder.objects.filter( + ebios_rm_study__in=ebios_rm_studies + ).distinct() + + risk_matrices = RiskMatrix.objects.filter( + Q(folder__in=folders) + | Q(riskassessment__in=risk_assessments) + | Q(ebios_rm_studies__in=ebios_rm_studies) + ).distinct() + + compliance_assessments = ComplianceAssessment.objects.filter( + Q(project__in=projects) + | Q(folder__in=folders) + | Q(ebios_rm_studies__in=ebios_rm_studies) + ).distinct() + requirement_assessments = RequirementAssessment.objects.filter( + compliance_assessment__in=compliance_assessments + ).distinct() + frameworks = Framework.objects.filter( + Q(folder__in=folders) | Q(complianceassessment__in=compliance_assessments) + ).distinct() + + entities = Entity.objects.filter( + Q(folder__in=folders) + | Q(stakeholders__in=stakeholders) + | Q(ebios_rm_studies__in=ebios_rm_studies) + ).distinct() + + assets = Asset.objects.filter( + Q(folder__in=folders) + | Q(risk_scenarios__in=risk_scenarios) + | Q(ebios_rm_studies__in=ebios_rm_studies) + | Q(feared_events__in=feared_events) + ).distinct() + + vulnerabilities = Vulnerability.objects.filter( + Q(folder__in=folders) | Q(risk_scenarios__in=risk_scenarios) + ).distinct() + + applied_controls = AppliedControl.objects.filter( + Q(folder__in=folders) + | Q(risk_scenarios__in=risk_scenarios) + | Q(risk_scenarios_e__in=risk_scenarios) + | Q(requirement_assessments__in=requirement_assessments) + | Q(stakeholders__in=stakeholders) + | Q(vulnerabilities__in=vulnerabilities) + ).distinct() + + reference_controls = ReferenceControl.objects.filter( + Q(folder__in=folders) | Q(appliedcontrol__in=applied_controls) + ).distinct() + + threats = Threat.objects.filter( + Q(folder__in=folders) + | Q(risk_scenarios__in=risk_scenarios) + | Q(operational_scenarios__in=operational_scenarios) + ).distinct() + + evidences = Evidence.objects.filter( + Q(folder__in=folders) + | Q(applied_controls__in=applied_controls) + | Q(requirement_assessments__in=requirement_assessments) + ).distinct() + + loaded_libraries = LoadedLibrary.objects.filter( + Q(folder__in=folders) + | Q(threats__in=threats) + | Q(reference_controls__in=reference_controls) + | Q(risk_matrices__in=risk_matrices) + | Q(frameworks__in=frameworks) + ).distinct() + + return { + # "folder": folders, + "loadedlibrary": loaded_libraries, + "vulnerability": vulnerabilities, + "framework": frameworks, + "riskmatrix": risk_matrices, + "referencecontrol": reference_controls, + "threat": threats, + "asset": assets, + "appliedcontrol": applied_controls, + "entity": entities, + "evidence": evidences, + "project": projects, + "complianceassessment": compliance_assessments, + "requirementassessment": requirement_assessments, + "ebiosrmstudy": ebios_rm_studies, + "riskassessment": risk_assessments, + "riskscenario": risk_scenarios, + "fearedevent": feared_events, + "roto": ro_tos, + "operationalscenario": operational_scenarios, + "stakeholder": stakeholders, + "strategicscenario": strategic_scenarios, + "attackpath": attack_paths, + } diff --git a/backend/tprm/serializers.py b/backend/tprm/serializers.py index b2a2703cd..08b0b98fd 100644 --- a/backend/tprm/serializers.py +++ b/backend/tprm/serializers.py @@ -2,7 +2,7 @@ from ciso_assistant.settings import EMAIL_HOST, EMAIL_HOST_RESCUE from core.models import ComplianceAssessment, Framework -from core.serializer_fields import FieldsRelatedField +from core.serializer_fields import FieldsRelatedField, HashSlugRelatedField from core.serializers import BaseModelSerializer from core.utils import RoleCodename, UserGroupCodename from iam.models import Folder, Role, RoleAssignment, UserGroup @@ -32,6 +32,24 @@ class Meta: exclude = ["owned_folders"] +class EntityImportExportSerializer(BaseModelSerializer): + folder = HashSlugRelatedField(slug_field="pk", read_only=True) + owned_folders = HashSlugRelatedField(slug_field="pk", many=True, read_only=True) + + class Meta: + model = Entity + fields = [ + "name", + "description", + "folder", + "mission", + "reference_link", + "owned_folders", + "created_at", + "updated_at", + ] + + class EntityAssessmentReadSerializer(BaseModelSerializer): compliance_assessment = FieldsRelatedField() evidence = FieldsRelatedField() diff --git a/enterprise/frontend/src/lib/components/Forms/ModelForm/FolderForm.svelte b/enterprise/frontend/src/lib/components/Forms/ModelForm/FolderForm.svelte index f9123d39a..f435ccf3f 100644 --- a/enterprise/frontend/src/lib/components/Forms/ModelForm/FolderForm.svelte +++ b/enterprise/frontend/src/lib/components/Forms/ModelForm/FolderForm.svelte @@ -4,20 +4,36 @@ import * as m from '$paraglide/messages.js'; import type { SuperValidated } from 'sveltekit-superforms'; import AutocompleteSelect from '../AutocompleteSelect.svelte'; + import TextField from '$lib/components/Forms/TextField.svelte'; + import FileInput from '../FileInput.svelte'; export let form: SuperValidated; export let model: ModelInfo; export let cacheLocks: Record = {}; export let formDataCache: Record = {}; export let initialData: Record = {}; + export let importFolder: boolean = false; - +{#if importFolder} + + +{:else} + +{/if} + diff --git a/enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/+page.server.ts b/enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/+page.server.ts new file mode 100644 index 000000000..e2fb2bca3 --- /dev/null +++ b/enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/+page.server.ts @@ -0,0 +1,7 @@ +import { getModelInfo } from '$lib/utils/crud'; +import { loadDetail } from '$lib/utils/load'; +import type { Actions, PageServerLoad } from './$types'; + +export const load: PageServerLoad = async (event) => { + return loadDetail({ event, model: getModelInfo('folders'), id: event.params.id }); +}; diff --git a/enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/+page.svelte b/enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/+page.svelte new file mode 100644 index 000000000..2476fe7b4 --- /dev/null +++ b/enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/+page.svelte @@ -0,0 +1,25 @@ + + + +
    +
    + + +
    +
    diff --git a/enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/export/+server.ts b/enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/export/+server.ts new file mode 100644 index 000000000..330a74537 --- /dev/null +++ b/enterprise/frontend/src/routes/(app)/(internal)/folders/[id=uuid]/export/+server.ts @@ -0,0 +1,22 @@ +import { BASE_API_URL } from '$lib/utils/constants'; + +import { error } from '@sveltejs/kit'; +import type { RequestHandler } from './$types'; + +export const GET: RequestHandler = async ({ fetch, params }) => { + const endpoint = `${BASE_API_URL}/folders/${params.id}/export/`; + + const res = await fetch(endpoint); + if (!res.ok) { + error(400, 'Error fetching the dump file'); + } + + const fileName = `ciso-assistant-domain-export-${new Date().toISOString()}.bak`; + + return new Response(await res.blob(), { + headers: { + 'Content-Type': 'application/zip', + 'Content-Disposition': `attachment; filename="${fileName}"` + } + }); +}; diff --git a/frontend/messages/en.json b/frontend/messages/en.json index 98da0d81e..29d10f325 100644 --- a/frontend/messages/en.json +++ b/frontend/messages/en.json @@ -1087,5 +1087,11 @@ "documentationScore": "Documentation score", "implementationScore": "Implementation score", "useDocumentationScore": "Use documentation score", - "useDocumentationScoreHelpText": "A second score (the documentation score) will be available for scoring the requirements." + "useDocumentationScoreHelpText": "A second score (the documentation score) will be available for scoring the requirements.", + "importFolder": "Import domain", + "importFolderHelpText": "Import a domain from a JSON or ZIP file", + "importVersionNotCompatibleWithCurrentVersion": "The import version is not compatible with the current version of the application", + "errorOccuredDuringImport": "An error occurred during the import", + "successfullyImportedFolder": "Folder successfully imported", + "missingLibrariesInImport": "Some libraries are missing, see the list above" } diff --git a/frontend/messages/fr.json b/frontend/messages/fr.json index 4a143ccdb..247401b1e 100644 --- a/frontend/messages/fr.json +++ b/frontend/messages/fr.json @@ -1087,5 +1087,11 @@ "documentationScore": "Score de documentation", "implementationScore": "Score d'implémentation", "useDocumentationScore": "Utiliser le score de documentation", - "useDocumentationScoreHelpText": "Un deuxième score (le score de documentation) sera disponible pour évaluer les exigences." + "useDocumentationScoreHelpText": "Un deuxième score (le score de documentation) sera disponible pour évaluer les exigences.", + "importFolder": "Importer un domaine", + "importFolderHelpText": "Importer un domaine à partir d'un ZIP ou fichier JSON", + "importVersionNotCompatibleWithCurrentVersion": "La version de l'import n'est pas compatible avec la version actuelle de l'application.", + "errorOccuredDuringImport": "Une erreur s'est produite lors de l'import", + "successfullyImportedFolder": "Domaine importé avec succès", + "missingLibrariesInImport": "Certaines bibliothèques sont manquantes, voir la liste ci-dessus" } diff --git a/frontend/src/lib/components/DetailView/DetailView.svelte b/frontend/src/lib/components/DetailView/DetailView.svelte index 2c9f99f4e..c15173e8b 100644 --- a/frontend/src/lib/components/DetailView/DetailView.svelte +++ b/frontend/src/lib/components/DetailView/DetailView.svelte @@ -392,6 +392,7 @@ {/if}
    {/if} +
    diff --git a/frontend/src/lib/components/Forms/ModelForm.svelte b/frontend/src/lib/components/Forms/ModelForm.svelte index cfa315880..35f596987 100644 --- a/frontend/src/lib/components/Forms/ModelForm.svelte +++ b/frontend/src/lib/components/Forms/ModelForm.svelte @@ -60,6 +60,7 @@ export let suggestions: { [key: string]: any } = {}; export let cancelButton = true; export let duplicate = false; + export let importFolder = false; export let customNameDescription = false; const URLModel = model.urlModel as urlModel; @@ -189,7 +190,7 @@ {#if URLModel === 'projects'} {:else if URLModel === 'folders'} - + {:else if URLModel === 'risk-assessments'} + import TextField from '$lib/components/Forms/TextField.svelte'; + import type { CacheLock, ModelInfo } from '$lib/utils/types'; + import * as m from '$paraglide/messages.js'; + import type { SuperValidated } from 'sveltekit-superforms'; + import FileInput from '../FileInput.svelte'; + + export let form: SuperValidated; + export let model: ModelInfo; + export let cacheLocks: Record = {}; + export let formDataCache: Record = {}; + export let initialData: Record = {}; + export let object: any = {}; + export let importFolder: boolean = false; + +{#if importFolder} + + +{/if} diff --git a/frontend/src/lib/components/Modals/CreateModal.svelte b/frontend/src/lib/components/Modals/CreateModal.svelte index ac563ce75..ee7c70c73 100644 --- a/frontend/src/lib/components/Modals/CreateModal.svelte +++ b/frontend/src/lib/components/Modals/CreateModal.svelte @@ -12,6 +12,7 @@ export let form: SuperValidated; export let customNameDescription = false; + export let importFolder = false; export let model: ModelInfo; export let duplicate = false; export let invalidateAll = true; // set to false to keep form data using muliple forms on a page @@ -49,6 +50,7 @@ {/if} diff --git a/frontend/src/lib/utils/actions.ts b/frontend/src/lib/utils/actions.ts index c869da8fa..976e14513 100644 --- a/frontend/src/lib/utils/actions.ts +++ b/frontend/src/lib/utils/actions.ts @@ -75,11 +75,11 @@ export async function handleErrorResponse({ res['filtering_labels'] = res.label; } if (res.warning) { - setFlash({ type: 'warning', message: res.warning }, event); + setFlash({ type: 'warning', message: safeTranslate(res.warning) }, event); return { form }; } if (res.error) { - setFlash({ type: 'error', message: res.error }, event); + setFlash({ type: 'error', message: safeTranslate(res.error) }, event); return { form }; } Object.entries(res).forEach(([key, value]) => { diff --git a/frontend/src/lib/utils/schemas.ts b/frontend/src/lib/utils/schemas.ts index 3a14f4f6a..00b53bb3f 100644 --- a/frontend/src/lib/utils/schemas.ts +++ b/frontend/src/lib/utils/schemas.ts @@ -67,6 +67,11 @@ export const FolderSchema = z.object({ parent_folder: z.string().optional() }); +export const FolderImportSchema = z.object({ + name: nameSchema, + file: z.instanceof(File) +}); + export const ProjectSchema = z.object({ ...NameDescriptionMixin, folder: z.string(), @@ -476,6 +481,7 @@ export const operationalScenarioSchema = z.object({ const SCHEMA_MAP: Record = { folders: FolderSchema, + 'folders-import': FolderImportSchema, projects: ProjectSchema, 'risk-matrices': RiskMatrixSchema, 'risk-assessments': RiskAssessmentSchema, diff --git a/frontend/src/routes/(app)/(internal)/[model=urlmodel]/+page.server.ts b/frontend/src/routes/(app)/(internal)/[model=urlmodel]/+page.server.ts index 2b5f03185..f40f7448d 100644 --- a/frontend/src/routes/(app)/(internal)/[model=urlmodel]/+page.server.ts +++ b/frontend/src/routes/(app)/(internal)/[model=urlmodel]/+page.server.ts @@ -8,10 +8,13 @@ import { import { modelSchema } from '$lib/utils/schemas'; import type { ModelInfo } from '$lib/utils/types'; import { type Actions } from '@sveltejs/kit'; -import { superValidate } from 'sveltekit-superforms'; +import { fail, superValidate, withFiles, setError } from 'sveltekit-superforms'; import { zod } from 'sveltekit-superforms/adapters'; import { z } from 'zod'; import type { PageServerLoad } from './$types'; +import { setFlash } from 'sveltekit-flash-message/server'; +import * as m from '$paraglide/messages'; +import { safeTranslate } from '$lib/utils/i18n'; export const load: PageServerLoad = async ({ params, fetch }) => { const schema = z.object({ id: z.string().uuid() }); @@ -63,6 +66,13 @@ export const load: PageServerLoad = async ({ params, fetch }) => { model['selectOptions'] = selectOptions; + if (model.urlModel === 'folders') { + const folderImportForm = await superValidate(zod(modelSchema('folders-import')), { + errors: false + }); + model['folderImportForm'] = folderImportForm; + } + return { createForm, deleteForm, model, URLModel }; }; @@ -78,5 +88,58 @@ export const actions: Actions = { }, delete: async (event) => { return defaultDeleteFormAction({ event, urlModel: event.params.model! }); + }, + importFolder: async (event) => { + const formData = Object.fromEntries(await event.request.formData()); + if (!formData) return fail(400, { error: 'No form data' }); + + const form = await superValidate(formData, zod(modelSchema('folders-import'))); + if (!form.valid) { + return fail(400, { form }); + } + + const { file } = formData as { file: File }; + + const endpoint = `${BASE_API_URL}/folders/import/`; + + const response = await event.fetch(endpoint, { + method: 'POST', + headers: { + 'Content-Disposition': `attachment; filename="${file.name}"`, + 'Content-Type': file.type, + 'X-CISOAssistantDomainName': form.data.name + }, + body: file + }); + const res = await response.json(); + + if (!response.ok && res.missing_libraries) { + setError(form, 'file', m.missingLibrariesInImport()); + for (const value of res.missing_libraries) { + setError(form, 'non_field_errors', value); + } + return fail(400, { form }); + } + + if (!response.ok) { + if (res.error) { + setFlash({ type: 'error', message: safeTranslate(res.error) }, event); + return { form }; + } + Object.entries(res).forEach(([key, value]) => { + setError(form, key, safeTranslate(value)); + }); + return fail(400, { form }); + } + + setFlash( + { + type: 'success', + message: m.successfullyImportedFolder() + }, + event + ); + + return withFiles({ form }); } }; diff --git a/frontend/src/routes/(app)/(internal)/[model=urlmodel]/+page.svelte b/frontend/src/routes/(app)/(internal)/[model=urlmodel]/+page.svelte index e06a1442a..f3a8ec84d 100644 --- a/frontend/src/routes/(app)/(internal)/[model=urlmodel]/+page.svelte +++ b/frontend/src/routes/(app)/(internal)/[model=urlmodel]/+page.svelte @@ -48,6 +48,40 @@ modalStore.trigger(modal); } + function modalFolderImportForm(): void { + let modalComponent: ModalComponent = { + ref: CreateModal, + props: { + form: data.model['folderImportForm'], + model: data.model, + customNameDescription: true, + importFolder: true, + formAction: '?/importFolder', + enctype: 'multipart/form-data', + dataType: 'form' + } + }; + let modal: ModalSettings = { + type: 'component', + component: modalComponent, + // Data + title: safeTranslate('importFolder') + }; + if (checkConstraints(data.createForm.constraints, data.model.foreignKeys).length > 0) { + modalComponent = { + ref: MissingConstraintsModal + }; + modal = { + type: 'component', + component: modalComponent, + title: m.warning(), + body: safeTranslate('add-' + data.model.localName).toLowerCase(), + value: checkConstraints(data.createForm.constraints, data.model.foreignKeys) + }; + } + modalStore.trigger(modal); + } + function handleKeyDown(event: KeyboardEvent) { if (event.metaKey || event.ctrlKey) return; if (document.activeElement?.tagName !== 'BODY') return; @@ -122,6 +156,13 @@ > {/if} {#if URLModel === 'folders'} +