diff --git a/dojo/finding/deduplication.py b/dojo/finding/deduplication.py new file mode 100644 index 00000000000..013b42b9277 --- /dev/null +++ b/dojo/finding/deduplication.py @@ -0,0 +1,561 @@ +import logging +from operator import attrgetter + +import hyperlink +from django.conf import settings +from django.db.models import Prefetch +from django.db.models.query_utils import Q + +from dojo.celery import app +from dojo.decorators import dojo_async_task, dojo_model_from_id, dojo_model_to_id +from dojo.models import Finding, System_Settings + +logger = logging.getLogger(__name__) +deduplicationLogger = logging.getLogger("dojo.specific-loggers.deduplication") + + +def get_finding_models_for_deduplication(finding_ids): + """ + Load findings with optimal prefetching for deduplication operations. + This avoids N+1 queries when accessing test, engagement, product, endpoints, and original_finding. + + Args: + finding_ids: A list of Finding IDs + + Returns: + A list of Finding models with related objects prefetched + + """ + if not finding_ids: + return [] + + return list( + Finding.objects.filter(id__in=finding_ids) + .select_related("test", "test__engagement", "test__engagement__product", "test__test_type") + .prefetch_related( + "endpoints", + # Prefetch duplicates of each finding to avoid N+1 when set_duplicate iterates + Prefetch( + "original_finding", + queryset=Finding.objects.only("id", "duplicate_finding_id").order_by("-id"), + ), + ), + ) + + +@dojo_model_to_id +@dojo_async_task +@app.task +@dojo_model_from_id +def do_dedupe_finding_task(new_finding, *args, **kwargs): + return do_dedupe_finding(new_finding, *args, **kwargs) + + +@dojo_async_task +@app.task +def do_dedupe_batch_task(finding_ids, *args, **kwargs): + """ + Async task to deduplicate a batch of findings. The findings are assumed to be in the same test. + Similar to post_process_findings_batch but focused only on deduplication. + """ + # Load findings with proper prefetching + findings = get_finding_models_for_deduplication(finding_ids) + + if not findings: + logger.debug(f"no findings found for batch deduplication with IDs: {finding_ids}") + return + + # Batch dedupe + dedupe_batch_of_findings(findings) + + +def do_dedupe_finding(new_finding, *args, **kwargs): + from dojo.utils import get_custom_method # noqa: PLC0415 -- circular import + if dedupe_method := get_custom_method("FINDING_DEDUPE_METHOD"): + return dedupe_method(new_finding, *args, **kwargs) + + try: + enabled = System_Settings.objects.get(no_cache=True).enable_deduplication + except System_Settings.DoesNotExist: + logger.warning("system settings not found") + enabled = False + + if enabled: + deduplicationLogger.debug("dedupe for: " + str(new_finding.id) + + ":" + str(new_finding.title)) + deduplicationAlgorithm = new_finding.test.deduplication_algorithm + deduplicationLogger.debug("deduplication algorithm: " + deduplicationAlgorithm) + if deduplicationAlgorithm == settings.DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL: + deduplicate_unique_id_from_tool(new_finding) + elif deduplicationAlgorithm == settings.DEDUPE_ALGO_HASH_CODE: + deduplicate_hash_code(new_finding) + elif deduplicationAlgorithm == settings.DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE: + deduplicate_uid_or_hash_code(new_finding) + else: + deduplicationLogger.debug("no configuration per parser found; using legacy algorithm") + deduplicate_legacy(new_finding) + else: + deduplicationLogger.debug("dedupe: skipping dedupe because it's disabled in system settings get()") + return None + + +def deduplicate_legacy(new_finding): + _dedupe_batch_legacy([new_finding]) + + +def deduplicate_unique_id_from_tool(new_finding): + _dedupe_batch_unique_id([new_finding]) + + +def deduplicate_hash_code(new_finding): + _dedupe_batch_hash_code([new_finding]) + + +def deduplicate_uid_or_hash_code(new_finding): + _dedupe_batch_uid_or_hash([new_finding]) + + +def set_duplicate(new_finding, existing_finding): + deduplicationLogger.debug(f"new_finding.status(): {new_finding.id} {new_finding.status()}") + deduplicationLogger.debug(f"existing_finding.status(): {existing_finding.id} {existing_finding.status()}") + if existing_finding.duplicate: + deduplicationLogger.debug("existing finding: %s:%s:duplicate=%s;duplicate_finding=%s", existing_finding.id, existing_finding.title, existing_finding.duplicate, existing_finding.duplicate_finding.id if existing_finding.duplicate_finding else "None") + msg = "Existing finding is a duplicate" + raise Exception(msg) + if existing_finding.id == new_finding.id: + msg = "Can not add duplicate to itself" + raise Exception(msg) + if is_duplicate_reopen(new_finding, existing_finding): + msg = "Found a regression. Ignore this so that a new duplicate chain can be made" + raise Exception(msg) + if new_finding.duplicate and finding_mitigated(existing_finding): + msg = "Skip this finding as we do not want to attach a new duplicate to a mitigated finding" + raise Exception(msg) + + deduplicationLogger.debug("Setting new finding " + str(new_finding.id) + " as a duplicate of existing finding " + str(existing_finding.id)) + new_finding.duplicate = True + new_finding.active = False + new_finding.verified = False + new_finding.duplicate_finding = existing_finding + + # Make sure transitive duplication is flattened + # if A -> B and B is made a duplicate of C here, afterwards: + # A -> C and B -> C should be true + # Ordering is ensured by the prefetch in post_process_findings_batch + # (we prefetch "original_finding" ordered by -id), so avoid calling + # order_by here to prevent bypassing the prefetch cache. + for find in new_finding.original_finding.all(): + new_finding.original_finding.remove(find) + set_duplicate(find, existing_finding) + existing_finding.found_by.add(new_finding.test.test_type) + logger.debug("saving new finding: %d", new_finding.id) + super(Finding, new_finding).save() + logger.debug("saving existing finding: %d", existing_finding.id) + super(Finding, existing_finding).save() + + +def is_duplicate_reopen(new_finding, existing_finding) -> bool: + return finding_mitigated(existing_finding) and finding_not_human_set_status(existing_finding) and not finding_mitigated(new_finding) + + +def finding_mitigated(finding: Finding) -> bool: + return finding.active is False and (finding.is_mitigated is True or finding.mitigated is not None) + + +def finding_not_human_set_status(finding: Finding) -> bool: + return finding.out_of_scope is False and finding.false_p is False + + +def set_duplicate_reopen(new_finding, existing_finding): + logger.debug("duplicate reopen existing finding") + existing_finding.mitigated = new_finding.mitigated + existing_finding.is_mitigated = new_finding.is_mitigated + existing_finding.active = new_finding.active + existing_finding.verified = new_finding.verified + existing_finding.notes.create(author=existing_finding.reporter, + entry="This finding has been automatically re-opened as it was found in recent scans.") + existing_finding.save() + + +def is_deduplication_on_engagement_mismatch(new_finding, to_duplicate_finding): + if new_finding.test.engagement != to_duplicate_finding.test.engagement: + deduplication_mismatch = new_finding.test.engagement.deduplication_on_engagement \ + or to_duplicate_finding.test.engagement.deduplication_on_engagement + if deduplication_mismatch: + deduplicationLogger.debug(f"deduplication_mismatch: {deduplication_mismatch} for new_finding {new_finding.id} and to_duplicate_finding {to_duplicate_finding.id} with test.engagement {new_finding.test.engagement.id} and {to_duplicate_finding.test.engagement.id}") + return deduplication_mismatch + return False + + +def get_endpoints_as_url(finding): + return [hyperlink.parse(str(e)) for e in finding.endpoints.all()] + + +def are_urls_equal(url1, url2, fields): + deduplicationLogger.debug("Check if url %s and url %s are equal in terms of %s.", url1, url2, fields) + for field in fields: + if (field == "scheme" and url1.scheme != url2.scheme) or (field == "host" and url1.host != url2.host): + return False + if (field == "port" and url1.port != url2.port) or (field == "path" and url1.path != url2.path) or (field == "query" and url1.query != url2.query) or (field == "fragment" and url1.fragment != url2.fragment) or (field == "userinfo" and url1.userinfo != url2.userinfo) or (field == "user" and url1.user != url2.user): + return False + return True + + +def are_endpoints_duplicates(new_finding, to_duplicate_finding): + fields = settings.DEDUPE_ALGO_ENDPOINT_FIELDS + if len(fields) == 0: + deduplicationLogger.debug("deduplication by endpoint fields is disabled") + return True + + list1 = get_endpoints_as_url(new_finding) + list2 = get_endpoints_as_url(to_duplicate_finding) + + deduplicationLogger.debug( + f"Starting deduplication by endpoint fields for finding {new_finding.id} with urls {list1} and finding {to_duplicate_finding.id} with urls {list2}", + ) + if list1 == [] and list2 == []: + return True + + for l1 in list1: + for l2 in list2: + if are_urls_equal(l1, l2, fields): + return True + + deduplicationLogger.debug(f"endpoints are not duplicates: {new_finding.id} and {to_duplicate_finding.id}") + return False + + +def build_dedupe_scope_queryset(test): + scope_on_engagement = test.engagement.deduplication_on_engagement + if scope_on_engagement: + scope_q = Q(test__engagement=test.engagement) + else: + # Product scope limited to current product, but exclude engagements that opted into engagement-scoped dedupe + scope_q = Q(test__engagement__product=test.engagement.product) & ( + Q(test__engagement=test.engagement) + | Q(test__engagement__deduplication_on_engagement=False) + ) + + return ( + Finding.objects.filter(scope_q) + .select_related("test", "test__engagement", "test__test_type") + .prefetch_related("endpoints") + ) + + +def find_candidates_for_deduplication_hash(test, findings): + base_queryset = build_dedupe_scope_queryset(test) + hash_codes = {f.hash_code for f in findings if getattr(f, "hash_code", None) is not None} + if not hash_codes: + return {} + existing_qs = ( + base_queryset.filter(hash_code__in=hash_codes) + .exclude(hash_code=None) + .exclude(duplicate=True) + .order_by("id") + ) + existing_by_hash = {} + for ef in existing_qs: + existing_by_hash.setdefault(ef.hash_code, []).append(ef) + deduplicationLogger.debug(f"Found {len(existing_by_hash)} existing findings by hash codes") + return existing_by_hash + + +def find_candidates_for_deduplication_unique_id(test, findings): + base_queryset = build_dedupe_scope_queryset(test) + unique_ids = {f.unique_id_from_tool for f in findings if getattr(f, "unique_id_from_tool", None) is not None} + if not unique_ids: + return {} + existing_qs = base_queryset.filter(unique_id_from_tool__in=unique_ids).exclude(unique_id_from_tool=None).exclude(duplicate=True).order_by("id") + # unique_id_from_tool can only apply to the same test_type because it is parser dependent + existing_qs = existing_qs.filter(test__test_type=test.test_type) + existing_by_uid = {} + for ef in existing_qs: + existing_by_uid.setdefault(ef.unique_id_from_tool, []).append(ef) + deduplicationLogger.debug(f"Found {len(existing_by_uid)} existing findings by unique IDs") + return existing_by_uid + + +def find_candidates_for_deduplication_uid_or_hash(test, findings): + base_queryset = build_dedupe_scope_queryset(test) + hash_codes = {f.hash_code for f in findings if getattr(f, "hash_code", None) is not None} + unique_ids = {f.unique_id_from_tool for f in findings if getattr(f, "unique_id_from_tool", None) is not None} + if not hash_codes and not unique_ids: + return {}, {} + + cond = Q() + if hash_codes: + cond |= Q(hash_code__isnull=False, hash_code__in=hash_codes) + if unique_ids: + # unique_id_from_tool can only apply to the same test_type because it is parser dependent + uid_q = Q(unique_id_from_tool__isnull=False, unique_id_from_tool__in=unique_ids) & Q(test__test_type=test.test_type) + cond |= uid_q + + existing_qs = base_queryset.filter(cond).exclude(duplicate=True).order_by("id") + + existing_by_hash = {} + existing_by_uid = {} + for ef in existing_qs: + if ef.hash_code is not None: + existing_by_hash.setdefault(ef.hash_code, []).append(ef) + if ef.unique_id_from_tool is not None: + existing_by_uid.setdefault(ef.unique_id_from_tool, []).append(ef) + deduplicationLogger.debug(f"Found {len(existing_by_uid)} existing findings by unique IDs") + deduplicationLogger.debug(f"Found {len(existing_by_hash)} existing findings by hash codes") + return existing_by_uid, existing_by_hash + + +def find_candidates_for_deduplication_legacy(test, findings): + base_queryset = build_dedupe_scope_queryset(test) + titles = {f.title for f in findings if getattr(f, "title", None)} + cwes = {f.cwe for f in findings if getattr(f, "cwe", 0)} + cwes.discard(0) + if not titles and not cwes: + return {}, {} + + existing_qs = base_queryset.filter(Q(title__in=titles) | Q(cwe__in=cwes)).exclude(duplicate=True).order_by("id") + + by_title = {} + by_cwe = {} + for ef in existing_qs: + if ef.title: + by_title.setdefault(ef.title, []).append(ef) + if getattr(ef, "cwe", 0): + by_cwe.setdefault(ef.cwe, []).append(ef) + deduplicationLogger.debug(f"Found {len(by_title)} existing findings by title") + deduplicationLogger.debug(f"Found {len(by_cwe)} existing findings by CWE") + deduplicationLogger.debug(f"Found {len(existing_qs)} existing findings by title or CWE") + return by_title, by_cwe + + +def _is_candidate_older(new_finding, candidate): + # Ensure the newer finding is marked as duplicate of the older finding + is_older = candidate.id < new_finding.id + if not is_older: + deduplicationLogger.debug(f"candidate is newer than or equal to new finding: {new_finding.id} and candidate {candidate.id}") + return is_older + + +def match_hash_candidate(new_finding, candidates_by_hash): + if new_finding.hash_code is None: + return None + possible_matches = candidates_by_hash.get(new_finding.hash_code, []) + deduplicationLogger.debug(f"Finding {new_finding.id}: Found {len(possible_matches)} findings with same hash_code, ids={[(c.id, c.hash_code) for c in possible_matches]}") + + for candidate in possible_matches: + if not _is_candidate_older(new_finding, candidate): + continue + if is_deduplication_on_engagement_mismatch(new_finding, candidate): + deduplicationLogger.debug("deduplication_on_engagement_mismatch, skipping dedupe.") + continue + if are_endpoints_duplicates(new_finding, candidate): + return candidate + return None + + +def match_unique_id_candidate(new_finding, candidates_by_uid): + if new_finding.unique_id_from_tool is None: + return None + + possible_matches = candidates_by_uid.get(new_finding.unique_id_from_tool, []) + deduplicationLogger.debug(f"Finding {new_finding.id}: Found {len(possible_matches)} findings with same unique_id_from_tool, ids={[(c.id, c.unique_id_from_tool) for c in possible_matches]}") + for candidate in possible_matches: + if not _is_candidate_older(new_finding, candidate): + deduplicationLogger.debug("UID: newer candidate, skipping dedupe.") + continue + if is_deduplication_on_engagement_mismatch(new_finding, candidate): + deduplicationLogger.debug("deduplication_on_engagement_mismatch, skipping dedupe.") + continue + return candidate + return None + + +def match_uid_or_hash_candidate(new_finding, candidates_by_uid, candidates_by_hash): + # Combine UID and hash candidates and walk oldest-first + uid_list = candidates_by_uid.get(new_finding.unique_id_from_tool, []) if new_finding.unique_id_from_tool is not None else [] + hash_list = candidates_by_hash.get(new_finding.hash_code, []) if new_finding.hash_code is not None else [] + deduplicationLogger.debug("Finding %s: UID_OR_HASH: uid_list ids=%s hash_list ids=%s", new_finding.id, [c.id for c in uid_list], [c.id for c in hash_list]) + combined_by_id = {c.id: c for c in uid_list} + for c in hash_list: + combined_by_id.setdefault(c.id, c) + deduplicationLogger.debug("Finding %s: UID_OR_HASH: combined candidate ids (sorted)=%s", new_finding.id, sorted(combined_by_id.keys())) + for candidate_id in sorted(combined_by_id.keys()): + candidate = combined_by_id[candidate_id] + if not _is_candidate_older(new_finding, candidate): + continue + if is_deduplication_on_engagement_mismatch(new_finding, candidate): + deduplicationLogger.debug("deduplication_on_engagement_mismatch, skipping dedupe.") + return None + if are_endpoints_duplicates(new_finding, candidate): + deduplicationLogger.debug("UID_OR_HASH: endpoints match, returning candidate %s with test_type %s unique_id_from_tool %s hash_code %s", candidate.id, candidate.test.test_type, candidate.unique_id_from_tool, candidate.hash_code) + return candidate + deduplicationLogger.debug("UID_OR_HASH: endpoints mismatch, skipping candidate %s", candidate.id) + return None + + +def match_legacy_candidate(new_finding, candidates_by_title, candidates_by_cwe): + # --------------------------------------------------------- + # 1) Collects all the findings that have the same: + # (title and static_finding and dynamic_finding) + # or (CWE and static_finding and dynamic_finding) + # as the new one + # (this is "cond1") + # --------------------------------------------------------- + candidates = [] + if getattr(new_finding, "title", None): + candidates.extend(candidates_by_title.get(new_finding.title, [])) + if getattr(new_finding, "cwe", 0): + candidates.extend(candidates_by_cwe.get(new_finding.cwe, [])) + + for candidate in candidates: + if not _is_candidate_older(new_finding, candidate): + continue + if is_deduplication_on_engagement_mismatch(new_finding, candidate): + deduplicationLogger.debug( + "deduplication_on_engagement_mismatch, skipping dedupe.") + continue + + flag_endpoints = False + flag_line_path = False + + # --------------------------------------------------------- + # 2) If existing and new findings have endpoints: compare them all + # Else look at line+file_path + # (if new finding is not static, do not deduplicate) + # --------------------------------------------------------- + + if candidate.endpoints.count() != 0 and new_finding.endpoints.count() != 0: + list1 = [str(e) for e in new_finding.endpoints.all()] + list2 = [str(e) for e in candidate.endpoints.all()] + if all(x in list1 for x in list2): + deduplicationLogger.debug("%s: existing endpoints are present in new finding", candidate.id) + flag_endpoints = True + elif new_finding.static_finding and new_finding.file_path and len(new_finding.file_path) > 0: + if str(candidate.line) == str(new_finding.line) and candidate.file_path == new_finding.file_path: + deduplicationLogger.debug("%s: file_path and line match", candidate.id) + flag_line_path = True + else: + deduplicationLogger.debug("no endpoints on one of the findings and file_path doesn't match; Deduplication will not occur") + else: + deduplicationLogger.debug("find.static/dynamic: %s/%s", candidate.static_finding, candidate.dynamic_finding) + deduplicationLogger.debug("new_finding.static/dynamic: %s/%s", new_finding.static_finding, new_finding.dynamic_finding) + deduplicationLogger.debug("find.file_path: %s", candidate.file_path) + deduplicationLogger.debug("new_finding.file_path: %s", new_finding.file_path) + deduplicationLogger.debug("no endpoints on one of the findings and the new finding is either dynamic or doesn't have a file_path; Deduplication will not occur") + + flag_hash = candidate.hash_code == new_finding.hash_code + + deduplicationLogger.debug( + "deduplication flags for new finding (" + ("dynamic" if new_finding.dynamic_finding else "static") + ") " + str(new_finding.id) + " and existing finding " + str(candidate.id) + + " flag_endpoints: " + str(flag_endpoints) + " flag_line_path:" + str(flag_line_path) + " flag_hash:" + str(flag_hash)) + + if (flag_endpoints or flag_line_path) and flag_hash: + return candidate + return None + + +def _dedupe_batch_hash_code(findings): + if not findings: + return + test = findings[0].test + candidates_by_hash = find_candidates_for_deduplication_hash(test, findings) + if not candidates_by_hash: + return + for new_finding in findings: + deduplicationLogger.debug(f"deduplication start for finding {new_finding.id} with DEDUPE_ALGO_HASH_CODE") + match = match_hash_candidate(new_finding, candidates_by_hash) + if match: + try: + set_duplicate(new_finding, match) + except Exception as e: + deduplicationLogger.debug(str(e)) + + +def _dedupe_batch_unique_id(findings): + if not findings: + return + test = findings[0].test + candidates_by_uid = find_candidates_for_deduplication_unique_id(test, findings) + if not candidates_by_uid: + return + for new_finding in findings: + deduplicationLogger.debug(f"deduplication start for finding {new_finding.id} with DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL") + match = match_unique_id_candidate(new_finding, candidates_by_uid) + if match: + try: + set_duplicate(new_finding, match) + except Exception as e: + deduplicationLogger.debug(str(e)) + + +def _dedupe_batch_uid_or_hash(findings): + if not findings: + return + + test = findings[0].test + candidates_by_uid, existing_by_hash = find_candidates_for_deduplication_uid_or_hash(test, findings) + if not (candidates_by_uid or existing_by_hash): + return + for new_finding in findings: + deduplicationLogger.debug(f"deduplication start for finding {new_finding.id} with DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE") + if new_finding.duplicate: + continue + + match = match_uid_or_hash_candidate(new_finding, candidates_by_uid, existing_by_hash) + if match: + try: + set_duplicate(new_finding, match) + except Exception as e: + deduplicationLogger.debug(str(e)) + continue + + +def _dedupe_batch_legacy(findings): + if not findings: + return + test = findings[0].test + candidates_by_title, candidates_by_cwe = find_candidates_for_deduplication_legacy(test, findings) + if not (candidates_by_title or candidates_by_cwe): + return + for new_finding in findings: + deduplicationLogger.debug(f"deduplication start for finding {new_finding.id} with DEDUPE_ALGO_LEGACY") + match = match_legacy_candidate(new_finding, candidates_by_title, candidates_by_cwe) + if match: + try: + set_duplicate(new_finding, match) + except Exception as e: + deduplicationLogger.debug(str(e)) + + +def dedupe_batch_of_findings(findings, *args, **kwargs): + """Batch deduplicate a list of findings. The findings are assumed to be in the same test.""" + if not findings: + return + + enabled = System_Settings.objects.get().enable_deduplication + + if enabled: + # sort findings by id to ensure deduplication is deterministic/reproducible + findings = sorted(findings, key=attrgetter("id")) + + from dojo.utils import get_custom_method # noqa: PLC0415 -- circular import + if batch_dedupe_method := get_custom_method("FINDING_DEDUPE_BATCH_METHOD"): + batch_dedupe_method(findings, *args, **kwargs) + + test = findings[0].test + dedup_alg = test.deduplication_algorithm + + if dedup_alg == settings.DEDUPE_ALGO_HASH_CODE: + logger.debug(f"deduplicating finding batch with DEDUPE_ALGO_HASH_CODE - {len(findings)} findings") + _dedupe_batch_hash_code(findings) + elif dedup_alg == settings.DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL: + logger.debug(f"deduplicating finding batch with DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL - {len(findings)} findings") + _dedupe_batch_unique_id(findings) + elif dedup_alg == settings.DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE: + logger.debug(f"deduplicating finding batch with DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE - {len(findings)} findings") + _dedupe_batch_uid_or_hash(findings) + else: + logger.debug(f"deduplicating finding batch with LEGACY - {len(findings)} findings") + _dedupe_batch_legacy(findings) + else: + deduplicationLogger.debug("dedupe: skipping dedupe because it's disabled in system settings get()") diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py index 95021e9575c..dc3cfdc7d13 100644 --- a/dojo/finding/helper.py +++ b/dojo/finding/helper.py @@ -17,6 +17,11 @@ from dojo.decorators import dojo_async_task, dojo_model_from_id, dojo_model_to_id from dojo.endpoint.utils import save_endpoints_to_add from dojo.file_uploads.helper import delete_related_files +from dojo.finding.deduplication import ( + dedupe_batch_of_findings, + do_dedupe_finding, + get_finding_models_for_deduplication, +) from dojo.models import ( Endpoint, Endpoint_Status, @@ -35,7 +40,6 @@ from dojo.utils import ( calculate_grade, close_external_issue, - do_dedupe_finding, do_false_positive_history, get_current_user, mass_model_updater, @@ -457,6 +461,59 @@ def post_process_finding_save_internal(finding, dedupe_option=True, rules_option jira_helper.push_to_jira(finding.finding_group) +@dojo_async_task(signature=True) +@app.task +def post_process_findings_batch_signature(finding_ids, *args, dedupe_option=True, rules_option=True, product_grading_option=True, + issue_updater_option=True, push_to_jira=False, user=None, **kwargs): + return post_process_findings_batch(finding_ids, dedupe_option, rules_option, product_grading_option, + issue_updater_option, push_to_jira, user, **kwargs) + + +@dojo_async_task +@app.task +def post_process_findings_batch(finding_ids, *args, dedupe_option=True, rules_option=True, product_grading_option=True, + issue_updater_option=True, push_to_jira=False, user=None, **kwargs): + + if not finding_ids: + return + + system_settings = System_Settings.objects.get() + + # use list() to force a complete query execution and related objects to be loaded once + findings = get_finding_models_for_deduplication(finding_ids) + + if not findings: + logger.debug(f"no findings found for batch deduplication with IDs: {finding_ids}") + return + + # Batch dedupe with single queries per algorithm; fallback to per-finding for anything else + if dedupe_option and system_settings.enable_deduplication: + dedupe_batch_of_findings(findings) + + if system_settings.false_positive_history: + # Only perform false positive history if deduplication is disabled + if system_settings.enable_deduplication: + deduplicationLogger.warning("skipping false positive history because deduplication is also enabled") + else: + for finding in findings: + do_false_positive_history(finding, *args, **kwargs) + + # Non-status changing tasks + if issue_updater_option: + for finding in findings: + tool_issue_updater.async_tool_issue_update(finding) + + if product_grading_option and system_settings.enable_product_grade: + calculate_grade(findings[0].test.engagement.product) + + if push_to_jira: + for finding in findings: + if finding.has_jira_issue or not finding.finding_group: + jira_helper.push_to_jira(finding) + else: + jira_helper.push_to_jira(finding.finding_group) + + @receiver(pre_delete, sender=Finding) def finding_pre_delete(sender, instance, **kwargs): logger.debug("finding pre_delete: %d", instance.id) diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index 726e55717eb..c2692dafa82 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -1,5 +1,6 @@ import logging +from django.conf import settings from django.core.files.uploadedfile import TemporaryUploadedFile from django.core.serializers import serialize from django.db.models.query_utils import Q @@ -157,10 +158,9 @@ def process_findings( parsed_findings: list[Finding], **kwargs: dict, ) -> list[Finding]: - # Progressive batching for chord execution - post_processing_task_signatures = [] - current_batch_number = 1 - max_batch_size = 1024 + # Batched post-processing (no chord): dispatch a task per 1000 findings or on final finding + batch_finding_ids: list[int] = [] + batch_max_size = getattr(settings, "IMPORT_REIMPORT_DEDUPE_BATCH_SIZE", 1000) """ Saves findings in memory that were parsed from the scan report into the database. @@ -233,32 +233,34 @@ def process_findings( finding = self.process_vulnerability_ids(finding) # Categorize this finding as a new one new_findings.append(finding) - # all data is already saved on the finding, we only need to trigger post processing - - # We create a signature for the post processing task so we can decide to apply it async or sync + # all data is already saved on the finding, we only need to trigger post processing in batches push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by) - post_processing_task_signature = finding_helper.post_process_finding_save_signature( - finding, - dedupe_option=True, - rules_option=True, - product_grading_option=False, - issue_updater_option=True, - push_to_jira=push_to_jira, - ) - - post_processing_task_signatures.append(post_processing_task_signature) - - # Check if we should launch a chord (batch full or end of findings) - if we_want_async(async_user=self.user) and post_processing_task_signatures: - post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord( - post_processing_task_signatures, - current_batch_number, - max_batch_size, - is_final_finding, - ) - else: - # Execute task immediately for synchronous processing - post_processing_task_signature() + batch_finding_ids.append(finding.id) + + # If batch is full or we're at the end, dispatch one batched task + if len(batch_finding_ids) >= batch_max_size or is_final_finding: + finding_ids_batch = list(batch_finding_ids) + batch_finding_ids.clear() + if we_want_async(async_user=self.user): + finding_helper.post_process_findings_batch_signature( + finding_ids_batch, + dedupe_option=True, + rules_option=True, + product_grading_option=True, + issue_updater_option=True, + push_to_jira=push_to_jira, + )() + else: + finding_helper.post_process_findings_batch( + finding_ids_batch, + dedupe_option=True, + rules_option=True, + product_grading_option=True, + issue_updater_option=True, + push_to_jira=push_to_jira, + ) + + # No chord: tasks are dispatched immediately above per batch for (group_name, findings) in group_names_to_findings_dict.items(): finding_helper.add_findings_to_auto_group( diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index f6c687c2f53..9307c76a31f 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -183,9 +183,7 @@ def process_findings( self.unchanged_items = [] self.group_names_to_findings_dict = {} # Progressive batching for chord execution - post_processing_task_signatures = [] - current_batch_number = 1 - max_batch_size = 1024 + # No chord: we dispatch per 1000 findings or on the final finding logger.debug(f"starting reimport of {len(parsed_findings) if parsed_findings else 0} items.") logger.debug("STEP 1: looping over findings from the reimported report and trying to match them to existing findings") @@ -205,6 +203,9 @@ def process_findings( continue cleaned_findings.append(sanitized) + batch_finding_ids: list[int] = [] + batch_max_size = 1000 + for idx, unsaved_finding in enumerate(cleaned_findings): is_final = idx == len(cleaned_findings) - 1 # Some parsers provide "mitigated" field but do not set timezone (because they are probably not available in the report) @@ -255,31 +256,34 @@ def process_findings( finding, unsaved_finding, ) - # all data is already saved on the finding, we only need to trigger post processing - - # Execute post-processing task immediately if async, otherwise execute synchronously + # all data is already saved on the finding, we only need to trigger post processing in batches push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by) - - post_processing_task_signature = finding_helper.post_process_finding_save_signature( - finding, - dedupe_option=True, - rules_option=True, - product_grading_option=False, - issue_updater_option=True, - push_to_jira=push_to_jira, - ) - post_processing_task_signatures.append(post_processing_task_signature) - - # Check if we should launch a chord (batch full or end of findings) - if we_want_async(async_user=self.user) and post_processing_task_signatures: - post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord( - post_processing_task_signatures, - current_batch_number, - max_batch_size, - is_final, - ) - else: - post_processing_task_signature() + batch_finding_ids.append(finding.id) + + # If batch is full or we're at the end, dispatch one batched task + if len(batch_finding_ids) >= batch_max_size or is_final: + finding_ids_batch = list(batch_finding_ids) + batch_finding_ids.clear() + if we_want_async(async_user=self.user): + finding_helper.post_process_findings_batch_signature( + finding_ids_batch, + dedupe_option=True, + rules_option=True, + product_grading_option=True, + issue_updater_option=True, + push_to_jira=push_to_jira, + )() + else: + finding_helper.post_process_findings_batch( + finding_ids_batch, + dedupe_option=True, + rules_option=True, + product_grading_option=True, + issue_updater_option=True, + push_to_jira=push_to_jira, + ) + + # No chord: tasks are dispatched immediately above per batch self.to_mitigate = (set(self.original_items) - set(self.reactivated_items) - set(self.unchanged_items)) # due to #3958 we can have duplicates inside the same report diff --git a/dojo/management/commands/clear_celery_queue.py b/dojo/management/commands/clear_celery_queue.py new file mode 100644 index 00000000000..514d6892bfa --- /dev/null +++ b/dojo/management/commands/clear_celery_queue.py @@ -0,0 +1,115 @@ +import logging + +from django.core.management.base import BaseCommand + +from dojo.celery import app + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Clear (purge) all tasks from Celery queues" + + def add_arguments(self, parser): + parser.add_argument( + "--queue", + type=str, + help="Specific queue name to clear (default: all queues)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be cleared without actually clearing", + ) + parser.add_argument( + "--force", + action="store_true", + help="Skip confirmation prompt (use with caution)", + ) + + def handle(self, *args, **options): + queue_name = options["queue"] + dry_run = options["dry_run"] + force = options["force"] + + # Get connection to broker + with app.connection() as conn: + # Get all queues or specific queue + if queue_name: + queues = [queue_name] + self.stdout.write(f"Targeting queue: {queue_name}") + else: + # Get all active queues from the broker + inspector = app.control.inspect() + active_queues = inspector.active_queues() + if active_queues: + # Extract unique queue names from all workers + queues = set() + for worker_queues in active_queues.values(): + queues.update(queue_info["name"] for queue_info in worker_queues) + queues = list(queues) + else: + # Fallback: try common default queue + queues = ["celery"] + self.stdout.write(f"Found {len(queues)} queue(s) to process") + + if not queues: + self.stdout.write(self.style.WARNING("No queues found to clear")) + return + + # Show what will be cleared + total_purged = 0 + for queue in queues: + try: + # Get queue length using channel + with conn.channel() as channel: + _, message_count, _ = channel.queue_declare(queue=queue, passive=True) + except Exception as e: + logger.debug(f"Could not get message count for queue {queue}: {e}") + message_count = "unknown" + + if dry_run: + self.stdout.write( + self.style.WARNING(f" Would purge {message_count} messages from queue: {queue}"), + ) + else: + self.stdout.write(f" Queue '{queue}': {message_count} messages") + + if dry_run: + self.stdout.write(self.style.SUCCESS("\nDry run complete. Use without --dry-run to actually purge.")) + return + + # Confirmation prompt + if not force: + self.stdout.write( + self.style.WARNING( + f"\nThis will permanently delete all messages from {len(queues)} queue(s).", + ), + ) + confirm = input("Are you sure you want to continue? (yes/no): ") + if confirm.lower() not in {"yes", "y"}: + self.stdout.write(self.style.ERROR("Operation cancelled.")) + return + + # Purge queues using direct channel purge + self.stdout.write("\nPurging queues...") + for queue in queues: + try: + with conn.channel() as channel: + purged_count = channel.queue_purge(queue=queue) + total_purged += purged_count + self.stdout.write( + self.style.SUCCESS(f" ✓ Purged {purged_count} messages from queue: {queue}"), + ) + except Exception as e: + self.stdout.write( + self.style.ERROR(f" ✗ Failed to purge queue '{queue}': {e}"), + ) + logger.error(f"Error purging queue {queue}: {e}") + + if total_purged > 0: + self.stdout.write( + self.style.SUCCESS(f"\nSuccessfully purged {total_purged} message(s) from {len(queues)} queue(s)."), + ) + else: + self.stdout.write(self.style.WARNING("\nNo messages were purged (queues may have been empty).")) diff --git a/dojo/management/commands/dedupe.py b/dojo/management/commands/dedupe.py index 90e063c975f..a8e0a538cfe 100644 --- a/dojo/management/commands/dedupe.py +++ b/dojo/management/commands/dedupe.py @@ -1,12 +1,19 @@ import logging +from django.conf import settings from django.core.management.base import BaseCommand +from django.db.models import Prefetch +from dojo.finding.deduplication import ( + dedupe_batch_of_findings, + do_dedupe_batch_task, + do_dedupe_finding, + do_dedupe_finding_task, + get_finding_models_for_deduplication, +) from dojo.models import Finding, Product from dojo.utils import ( calculate_grade, - do_dedupe_finding, - do_dedupe_finding_task, get_system_setting, mass_model_updater, ) @@ -26,11 +33,11 @@ def generate_hash_code(f): class Command(BaseCommand): """ - Updates hash codes and/or runs deduplication for findings. Hashcode calculation always runs in the foreground, dedupe by default runs in the background. - Usage: manage.py dedupe [--parser "Parser1 Scan" --parser "Parser2 Scan"...] [--hash_code_only] [--dedupe_only] [--dedupe_sync]' + Updates hash codes and/or runs deduplication for findings. Hashcode calculation always runs in the foreground, dedupe by default runs in the background in batch mode. + Usage: manage.py dedupe [--parser "Parser1 Scan" --parser "Parser2 Scan"...] [--hash_code_only] [--dedupe_only] [--dedupe_sync] [--dedupe_batch_mode]' """ - help = 'Usage: manage.py dedupe [--parser "Parser1 Scan" --parser "Parser2 Scan"...] [--hash_code_only] [--dedupe_only] [--dedupe_sync]' + help = 'Usage: manage.py dedupe [--parser "Parser1 Scan" --parser "Parser2 Scan"...] [--hash_code_only] [--dedupe_only] [--dedupe_sync] [--dedupe_batch_mode]' def add_arguments(self, parser): parser.add_argument( @@ -43,28 +50,45 @@ def add_arguments(self, parser): parser.add_argument("--hash_code_only", action="store_true", help="Only compute hash codes") parser.add_argument("--dedupe_only", action="store_true", help="Only run deduplication") parser.add_argument("--dedupe_sync", action="store_true", help="Run dedupe in the foreground, default false") + parser.add_argument( + "--dedupe_batch_mode", + action="store_true", + default=True, + help="Deduplicate in batches (similar to import), works with both sync and async modes (default: True)", + ) def handle(self, *args, **options): restrict_to_parsers = options["parser"] hash_code_only = options["hash_code_only"] dedupe_only = options["dedupe_only"] dedupe_sync = options["dedupe_sync"] + dedupe_batch_mode = options.get("dedupe_batch_mode", True) # Default to True (batch mode enabled) if restrict_to_parsers is not None: - findings = Finding.objects.filter(test__test_type__name__in=restrict_to_parsers) + findings = Finding.objects.filter(test__test_type__name__in=restrict_to_parsers).exclude(duplicate=True) logger.info("######## Will process only parsers %s and %d findings ########", *restrict_to_parsers, findings.count()) else: # add filter on id to make counts not slow on mysql - findings = Finding.objects.all().filter(id__gt=0) + # exclude duplicates to avoid reprocessing findings that are already marked as duplicates + findings = Finding.objects.all().filter(id__gt=0).exclude(duplicate=True) logger.info("######## Will process the full database with %d findings ########", findings.count()) + # Prefetch related objects for synchronous deduplication + findings = findings.select_related( + "test", "test__engagement", "test__engagement__product", "test__test_type", + ).prefetch_related( + "endpoints", + Prefetch( + "original_finding", + queryset=Finding.objects.only("id", "duplicate_finding_id").order_by("-id"), + ), + ) + # Phase 1: update hash_codes without deduplicating if not dedupe_only: logger.info("######## Start Updating Hashcodes (foreground) ########") - # only prefetch here for hash_code calculation - finds = findings.prefetch_related("endpoints", "test__test_type") - mass_model_updater(Finding, finds, generate_hash_code, fields=["hash_code"], order="asc", log_prefix="hash_code computation ") + mass_model_updater(Finding, findings, generate_hash_code, fields=["hash_code"], order="asc", log_prefix="hash_code computation ") logger.info("######## Done Updating Hashcodes########") @@ -72,17 +96,72 @@ def handle(self, *args, **options): if not hash_code_only: if get_system_setting("enable_deduplication"): logger.info("######## Start deduplicating (%s) ########", ("foreground" if dedupe_sync else "background")) - if dedupe_sync: + if dedupe_batch_mode: + self._dedupe_batch_mode(findings, dedupe_sync=dedupe_sync) + elif dedupe_sync: mass_model_updater(Finding, findings, do_dedupe_finding, fields=None, order="desc", page_size=100, log_prefix="deduplicating ") else: # async tasks only need the id mass_model_updater(Finding, findings.only("id"), lambda f: do_dedupe_finding_task(f.id), fields=None, order="desc", log_prefix="deduplicating ") - # update the grading (if enabled) - logger.debug("Updating grades for products...") - for product in Product.objects.all(): - calculate_grade(product) + if dedupe_sync: + # update the grading (if enabled) and only useful in sync mode + # in async mode the background task that grades products every hour will pick it up + logger.debug("Updating grades for products...") + for product in Product.objects.all(): + calculate_grade(product) logger.info("######## Done deduplicating (%s) ########", ("foreground" if dedupe_sync else "tasks submitted to celery")) else: logger.debug("skipping dedupe because it's disabled in system settings") + + def _dedupe_batch_mode(self, findings_queryset, *, dedupe_sync: bool = True): + """ + Deduplicate findings in batches of max 1000 per test (similar to import process). + This is more efficient than processing findings one-by-one. + Can run synchronously or asynchronously. + """ + mode_str = "synchronous" if dedupe_sync else "asynchronous" + logger.info(f"######## Deduplicating in batch mode ({mode_str}) ########") + + batch_max_size = getattr(settings, "IMPORT_REIMPORT_DEDUPE_BATCH_SIZE", 1000) + total_findings = findings_queryset.count() + logger.info(f"Processing {total_findings} findings in batches of max {batch_max_size} per test ({mode_str})") + + # Group findings by test_id to process them in batches per test + test_ids = findings_queryset.values_list("test_id", flat=True).distinct() + total_tests = len(test_ids) + total_processed = 0 + + for test_id in test_ids: + # Get finding IDs for this test (exclude duplicates to avoid reprocessing) + test_finding_ids = list(findings_queryset.filter(test_id=test_id).exclude(duplicate=True).values_list("id", flat=True)) + + if not test_finding_ids: + continue + + # Process findings for this test in batches of max batch_max_size + batch_finding_ids = [] + for idx, finding_id in enumerate(test_finding_ids): + is_final_finding_for_test = idx == len(test_finding_ids) - 1 + batch_finding_ids.append(finding_id) + + # If batch is full or we're at the end of this test's findings, process the batch + if len(batch_finding_ids) >= batch_max_size or is_final_finding_for_test: + if dedupe_sync: + # Synchronous: load findings and process immediately + batch_findings = get_finding_models_for_deduplication(batch_finding_ids) + logger.debug(f"Deduplicating batch of {len(batch_findings)} findings for test {test_id}") + dedupe_batch_of_findings(batch_findings) + else: + # Asynchronous: submit task with finding IDs + logger.debug(f"Submitting async batch task for {len(batch_finding_ids)} findings for test {test_id}") + do_dedupe_batch_task(batch_finding_ids) + + total_processed += len(batch_finding_ids) + batch_finding_ids = [] + + if total_processed % (batch_max_size * 10) == 0: + logger.info(f"Processed {total_processed}/{total_findings} findings") + + logger.info(f"######## Completed batch deduplication for {total_processed} findings across {total_tests} tests ({mode_str}) ########") diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index b435f4970b8..54a8923b3c4 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -268,6 +268,8 @@ DD_EDITABLE_MITIGATED_DATA=(bool, False), # new feature that tracks history across multiple reimports for the same test DD_TRACK_IMPORT_HISTORY=(bool, True), + # Batch size for import/reimport deduplication processing + DD_IMPORT_REIMPORT_DEDUPE_BATCH_SIZE=(int, 1000), # Delete Auditlogs older than x month; -1 to keep all logs DD_AUDITLOG_FLUSH_RETENTION_PERIOD=(int, -1), # Batch size for flushing audit logs per task run @@ -1683,6 +1685,7 @@ def saml2_attrib_map_format(din): DISABLE_FINDING_MERGE = env("DD_DISABLE_FINDING_MERGE") TRACK_IMPORT_HISTORY = env("DD_TRACK_IMPORT_HISTORY") +IMPORT_REIMPORT_DEDUPE_BATCH_SIZE = env("DD_IMPORT_REIMPORT_DEDUPE_BATCH_SIZE") # ------------------------------------------------------------------------------ # JIRA diff --git a/dojo/utils.py b/dojo/utils.py index fc676e8d2cf..a00ba7b48f1 100644 --- a/dojo/utils.py +++ b/dojo/utils.py @@ -16,7 +16,6 @@ import bleach import crum import cvss -import hyperlink import vobject from asteval import Interpreter from auditlog.models import LogEntry @@ -237,353 +236,6 @@ def match_finding_to_existing_findings(finding, product=None, engagement=None, t return None -# true if both findings are on an engagement that have a different "deduplication on engagement" configuration -def is_deduplication_on_engagement_mismatch(new_finding, to_duplicate_finding): - return not new_finding.test.engagement.deduplication_on_engagement and to_duplicate_finding.test.engagement.deduplication_on_engagement - - -def get_endpoints_as_url(finding): - return [hyperlink.parse(str(e)) for e in finding.endpoints.all()] - - -def are_urls_equal(url1, url2, fields): - # Possible values are: scheme, host, port, path, query, fragment, userinfo, and user. - # For a details description see https://hyperlink.readthedocs.io/en/latest/api.html#attributes - deduplicationLogger.debug("Check if url %s and url %s are equal in terms of %s.", url1, url2, fields) - for field in fields: - if field == "scheme": - if url1.scheme != url2.scheme: - return False - elif field == "host": - if url1.host != url2.host: - return False - elif field == "port": - if url1.port != url2.port: - return False - elif field == "path": - if url1.path != url2.path: - return False - elif field == "query": - if url1.query != url2.query: - return False - elif field == "fragment": - if url1.fragment != url2.fragment: - return False - elif field == "userinfo": - if url1.userinfo != url2.userinfo: - return False - elif field == "user": - if url1.user != url2.user: - return False - else: - logger.warning("Field " + field + " is not supported by the endpoint dedupe algorithm, ignoring it.") - return True - - -def are_endpoints_duplicates(new_finding, to_duplicate_finding): - fields = settings.DEDUPE_ALGO_ENDPOINT_FIELDS - # shortcut if fields list is empty/feature is disabled - if len(fields) == 0: - deduplicationLogger.debug("deduplication by endpoint fields is disabled") - return True - - list1 = get_endpoints_as_url(new_finding) - list2 = get_endpoints_as_url(to_duplicate_finding) - - deduplicationLogger.debug(f"Starting deduplication by endpoint fields for finding {new_finding.id} with urls {list1} and finding {to_duplicate_finding.id} with urls {list2}") - if list1 == [] and list2 == []: - return True - - for l1 in list1: - for l2 in list2: - if are_urls_equal(l1, l2, fields): - return True - return False - - -@dojo_model_to_id -@dojo_async_task -@app.task -@dojo_model_from_id -def do_dedupe_finding_task(new_finding, *args, **kwargs): - return do_dedupe_finding(new_finding, *args, **kwargs) - - -def do_dedupe_finding(new_finding, *args, **kwargs): - if dedupe_method := get_custom_method("FINDING_DEDUPE_METHOD"): - return dedupe_method(new_finding, *args, **kwargs) - - try: - enabled = System_Settings.objects.get(no_cache=True).enable_deduplication - except System_Settings.DoesNotExist: - logger.warning("system settings not found") - enabled = False - if enabled: - deduplicationLogger.debug("dedupe for: " + str(new_finding.id) - + ":" + str(new_finding.title)) - deduplicationAlgorithm = new_finding.test.deduplication_algorithm - deduplicationLogger.debug("deduplication algorithm: " + deduplicationAlgorithm) - if deduplicationAlgorithm == settings.DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL: - deduplicate_unique_id_from_tool(new_finding) - elif deduplicationAlgorithm == settings.DEDUPE_ALGO_HASH_CODE: - deduplicate_hash_code(new_finding) - elif deduplicationAlgorithm == settings.DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE: - deduplicate_uid_or_hash_code(new_finding) - else: - deduplicationLogger.debug("no configuration per parser found; using legacy algorithm") - deduplicate_legacy(new_finding) - else: - deduplicationLogger.debug("dedupe: skipping dedupe because it's disabled in system settings get()") - return None - - -def deduplicate_legacy(new_finding): - # --------------------------------------------------------- - # 1) Collects all the findings that have the same: - # (title and static_finding and dynamic_finding) - # or (CWE and static_finding and dynamic_finding) - # as the new one - # (this is "cond1") - # --------------------------------------------------------- - if new_finding.test.engagement.deduplication_on_engagement: - eng_findings_cwe = Finding.objects.filter( - test__engagement=new_finding.test.engagement, - cwe=new_finding.cwe).exclude(id=new_finding.id).exclude(cwe=0).exclude(duplicate=True).values("id") - eng_findings_title = Finding.objects.filter( - test__engagement=new_finding.test.engagement, - title=new_finding.title).exclude(id=new_finding.id).exclude(duplicate=True).values("id") - else: - eng_findings_cwe = Finding.objects.filter( - test__engagement__product=new_finding.test.engagement.product, - cwe=new_finding.cwe).exclude(id=new_finding.id).exclude(cwe=0).exclude(duplicate=True).values("id") - eng_findings_title = Finding.objects.filter( - test__engagement__product=new_finding.test.engagement.product, - title=new_finding.title).exclude(id=new_finding.id).exclude(duplicate=True).values("id") - - total_findings = Finding.objects.filter(Q(id__in=eng_findings_cwe) | Q(id__in=eng_findings_title)).prefetch_related("endpoints", "test", "test__engagement", "found_by", "original_finding", "test__test_type") - deduplicationLogger.debug("Found " - + str(len(eng_findings_cwe)) + " findings with same cwe, " - + str(len(eng_findings_title)) + " findings with same title: " - + str(len(total_findings)) + " findings with either same title or same cwe") - - # total_findings = total_findings.order_by('date') - for find in total_findings.order_by("id"): - flag_endpoints = False - flag_line_path = False - flag_hash = False - if is_deduplication_on_engagement_mismatch(new_finding, find): - deduplicationLogger.debug( - "deduplication_on_engagement_mismatch, skipping dedupe.") - continue - - # --------------------------------------------------------- - # 2) If existing and new findings have endpoints: compare them all - # Else look at line+file_path - # (if new finding is not static, do not deduplicate) - # --------------------------------------------------------- - - if find.endpoints.count() != 0 and new_finding.endpoints.count() != 0: - list1 = [str(e) for e in new_finding.endpoints.all()] - list2 = [str(e) for e in find.endpoints.all()] - - if all(x in list1 for x in list2): - deduplicationLogger.debug("%s: existing endpoints are present in new finding", find.id) - flag_endpoints = True - elif new_finding.static_finding and new_finding.file_path and len(new_finding.file_path) > 0: - if str(find.line) == str(new_finding.line) and find.file_path == new_finding.file_path: - deduplicationLogger.debug("%s: file_path and line match", find.id) - flag_line_path = True - else: - deduplicationLogger.debug("no endpoints on one of the findings and file_path doesn't match; Deduplication will not occur") - else: - deduplicationLogger.debug("find.static/dynamic: %s/%s", find.static_finding, find.dynamic_finding) - deduplicationLogger.debug("new_finding.static/dynamic: %s/%s", new_finding.static_finding, new_finding.dynamic_finding) - deduplicationLogger.debug("find.file_path: %s", find.file_path) - deduplicationLogger.debug("new_finding.file_path: %s", new_finding.file_path) - - deduplicationLogger.debug("no endpoints on one of the findings and the new finding is either dynamic or doesn't have a file_path; Deduplication will not occur") - - if find.hash_code == new_finding.hash_code: - flag_hash = True - - deduplicationLogger.debug( - "deduplication flags for new finding (" + ("dynamic" if new_finding.dynamic_finding else "static") + ") " + str(new_finding.id) + " and existing finding " + str(find.id) - + " flag_endpoints: " + str(flag_endpoints) + " flag_line_path:" + str(flag_line_path) + " flag_hash:" + str(flag_hash)) - - # --------------------------------------------------------- - # 3) Findings are duplicate if (cond1 is true) and they have the same: - # hash - # and (endpoints or (line and file_path) - # --------------------------------------------------------- - if ((flag_endpoints or flag_line_path) and flag_hash): - try: - set_duplicate(new_finding, find) - except Exception as e: - deduplicationLogger.debug(str(e)) - continue - - break - - -def deduplicate_unique_id_from_tool(new_finding): - if new_finding.test.engagement.deduplication_on_engagement: - existing_findings = Finding.objects.filter( - test__engagement=new_finding.test.engagement, - # the unique_id_from_tool is unique for a given tool: do not compare with other tools - test__test_type=new_finding.test.test_type, - unique_id_from_tool=new_finding.unique_id_from_tool).exclude( - id=new_finding.id).exclude( - unique_id_from_tool=None).exclude( - duplicate=True).order_by("id") - else: - existing_findings = Finding.objects.filter( - test__engagement__product=new_finding.test.engagement.product, - # the unique_id_from_tool is unique for a given tool: do not compare with other tools - test__test_type=new_finding.test.test_type, - unique_id_from_tool=new_finding.unique_id_from_tool).exclude( - id=new_finding.id).exclude( - unique_id_from_tool=None).exclude( - duplicate=True).order_by("id") - - deduplicationLogger.debug("Found " - + str(len(existing_findings)) + " findings with same unique_id_from_tool") - for find in existing_findings: - if is_deduplication_on_engagement_mismatch(new_finding, find): - deduplicationLogger.debug( - "deduplication_on_engagement_mismatch, skipping dedupe.") - continue - try: - set_duplicate(new_finding, find) - break - except Exception as e: - deduplicationLogger.debug(str(e)) - continue - - -def deduplicate_hash_code(new_finding): - if new_finding.test.engagement.deduplication_on_engagement: - existing_findings = Finding.objects.filter( - test__engagement=new_finding.test.engagement, - hash_code=new_finding.hash_code).exclude( - id=new_finding.id).exclude( - hash_code=None).exclude( - duplicate=True).order_by("id") - else: - existing_findings = Finding.objects.filter( - test__engagement__product=new_finding.test.engagement.product, - hash_code=new_finding.hash_code).exclude( - id=new_finding.id).exclude( - hash_code=None).exclude( - duplicate=True).order_by("id") - - deduplicationLogger.debug("Found " - + str(len(existing_findings)) + " findings with same hash_code") - for find in existing_findings: - if is_deduplication_on_engagement_mismatch(new_finding, find): - deduplicationLogger.debug( - "deduplication_on_engagement_mismatch, skipping dedupe.") - continue - try: - if are_endpoints_duplicates(new_finding, find): - set_duplicate(new_finding, find) - break - except Exception as e: - deduplicationLogger.debug(str(e)) - continue - - -def deduplicate_uid_or_hash_code(new_finding): - if new_finding.test.engagement.deduplication_on_engagement: - existing_findings = Finding.objects.filter( - (Q(hash_code__isnull=False) & Q(hash_code=new_finding.hash_code)) - # unique_id_from_tool can only apply to the same test_type because it is parser dependent - | (Q(unique_id_from_tool__isnull=False) & Q(unique_id_from_tool=new_finding.unique_id_from_tool) & Q(test__test_type=new_finding.test.test_type)), - test__engagement=new_finding.test.engagement).exclude( - id=new_finding.id).exclude( - duplicate=True).order_by("id") - else: - # same without "test__engagement=new_finding.test.engagement" condition - existing_findings = Finding.objects.filter( - (Q(hash_code__isnull=False) & Q(hash_code=new_finding.hash_code)) - | (Q(unique_id_from_tool__isnull=False) & Q(unique_id_from_tool=new_finding.unique_id_from_tool) & Q(test__test_type=new_finding.test.test_type)), - test__engagement__product=new_finding.test.engagement.product).exclude( - id=new_finding.id).exclude( - duplicate=True).order_by("id") - deduplicationLogger.debug("Found " - + str(len(existing_findings)) + " findings with either the same unique_id_from_tool or hash_code: " + str([find.id for find in existing_findings])) - for find in existing_findings: - if is_deduplication_on_engagement_mismatch(new_finding, find): - deduplicationLogger.debug( - "deduplication_on_engagement_mismatch, skipping dedupe.") - continue - try: - if are_endpoints_duplicates(new_finding, find): - set_duplicate(new_finding, find) - break - except Exception as e: - deduplicationLogger.debug(str(e)) - continue - - -def set_duplicate(new_finding, existing_finding): - deduplicationLogger.debug(f"new_finding.status(): {new_finding.id} {new_finding.status()}") - deduplicationLogger.debug(f"existing_finding.status(): {existing_finding.id} {existing_finding.status()}") - if existing_finding.duplicate: - deduplicationLogger.debug("existing finding: %s:%s:duplicate=%s;duplicate_finding=%s", existing_finding.id, existing_finding.title, existing_finding.duplicate, existing_finding.duplicate_finding.id if existing_finding.duplicate_finding else "None") - msg = "Existing finding is a duplicate" - raise Exception(msg) - if existing_finding.id == new_finding.id: - msg = "Can not add duplicate to itself" - raise Exception(msg) - if is_duplicate_reopen(new_finding, existing_finding): - msg = "Found a regression. Ignore this so that a new duplicate chain can be made" - raise Exception(msg) - if new_finding.duplicate and finding_mitigated(existing_finding): - msg = "Skip this finding as we do not want to attach a new duplicate to a mitigated finding" - raise Exception(msg) - - deduplicationLogger.debug("Setting new finding " + str(new_finding.id) + " as a duplicate of existing finding " + str(existing_finding.id)) - new_finding.duplicate = True - new_finding.active = False - new_finding.verified = False - new_finding.duplicate_finding = existing_finding - - # Make sure transitive duplication is flattened - # if A -> B and B is made a duplicate of C here, aferwards: - # A -> C and B -> C should be true - for find in new_finding.original_finding.all().order_by("-id"): - new_finding.original_finding.remove(find) - set_duplicate(find, existing_finding) - existing_finding.found_by.add(new_finding.test.test_type) - logger.debug("saving new finding: %d", new_finding.id) - super(Finding, new_finding).save() - logger.debug("saving existing finding: %d", existing_finding.id) - super(Finding, existing_finding).save() - - -def is_duplicate_reopen(new_finding, existing_finding) -> bool: - return finding_mitigated(existing_finding) and finding_not_human_set_status(existing_finding) and not finding_mitigated(new_finding) - - -def finding_mitigated(finding: Finding) -> bool: - return finding.active is False and (finding.is_mitigated is True or finding.mitigated is not None) - - -def finding_not_human_set_status(finding: Finding) -> bool: - return finding.out_of_scope is False and finding.false_p is False - - -def set_duplicate_reopen(new_finding, existing_finding): - logger.debug("duplicate reopen existing finding") - existing_finding.mitigated = new_finding.mitigated - existing_finding.is_mitigated = new_finding.is_mitigated - existing_finding.active = new_finding.active - existing_finding.verified = new_finding.verified - existing_finding.notes.create(author=existing_finding.reporter, - entry="This finding has been automatically re-opened as it was found in recent scans.") - existing_finding.save() - - def count_findings(findings: QuerySet) -> tuple[dict["Product", list[int]], dict[str, int]]: agg = ( findings.values(prod_id=F("test__engagement__product_id")) diff --git a/unittests/test_duplication_loops.py b/unittests/test_duplication_loops.py index d85e52e1046..9a84024e560 100644 --- a/unittests/test_duplication_loops.py +++ b/unittests/test_duplication_loops.py @@ -3,9 +3,9 @@ from crum import impersonate from django.test.utils import override_settings +from dojo.finding.deduplication import set_duplicate from dojo.management.commands.fix_loop_duplicates import fix_loop_duplicates from dojo.models import Engagement, Finding, Product, User, copy_model_util -from dojo.utils import set_duplicate from .dojo_test_case import DojoTestCase diff --git a/unittests/test_importers_performance.py b/unittests/test_importers_performance.py index 38d63babad1..c0da1e213c7 100644 --- a/unittests/test_importers_performance.py +++ b/unittests/test_importers_performance.py @@ -178,11 +178,11 @@ def test_import_reimport_reimport_performance_async(self): self._import_reimport_performance( expected_num_queries1=340, - expected_num_async_tasks1=10, + expected_num_async_tasks1=7, expected_num_queries2=288, - expected_num_async_tasks2=22, + expected_num_async_tasks2=18, expected_num_queries3=175, - expected_num_async_tasks3=20, + expected_num_async_tasks3=17, ) @override_settings(ENABLE_AUDITLOG=True, AUDITLOG_TYPE="django-pghistory") @@ -196,11 +196,11 @@ def test_import_reimport_reimport_performance_pghistory_async(self): self._import_reimport_performance( expected_num_queries1=306, - expected_num_async_tasks1=10, + expected_num_async_tasks1=7, expected_num_queries2=281, - expected_num_async_tasks2=22, + expected_num_async_tasks2=18, expected_num_queries3=170, - expected_num_async_tasks3=20, + expected_num_async_tasks3=17, ) @override_settings(ENABLE_AUDITLOG=True, AUDITLOG_TYPE="django-auditlog") @@ -219,12 +219,12 @@ def test_import_reimport_reimport_performance_no_async(self): testuser.usercontactinfo.block_execution = True testuser.usercontactinfo.save() self._import_reimport_performance( - expected_num_queries1=350, - expected_num_async_tasks1=10, - expected_num_queries2=305, - expected_num_async_tasks2=22, - expected_num_queries3=190, - expected_num_async_tasks3=20, + expected_num_queries1=345, + expected_num_async_tasks1=6, + expected_num_queries2=293, + expected_num_async_tasks2=17, + expected_num_queries3=180, + expected_num_async_tasks3=16, ) @override_settings(ENABLE_AUDITLOG=True, AUDITLOG_TYPE="django-pghistory") @@ -241,12 +241,12 @@ def test_import_reimport_reimport_performance_pghistory_no_async(self): testuser.usercontactinfo.save() self._import_reimport_performance( - expected_num_queries1=316, - expected_num_async_tasks1=10, - expected_num_queries2=298, - expected_num_async_tasks2=22, - expected_num_queries3=185, - expected_num_async_tasks3=20, + expected_num_queries1=311, + expected_num_async_tasks1=6, + expected_num_queries2=286, + expected_num_async_tasks2=17, + expected_num_queries3=175, + expected_num_async_tasks3=16, ) @override_settings(ENABLE_AUDITLOG=True, AUDITLOG_TYPE="django-auditlog") @@ -267,12 +267,12 @@ def test_import_reimport_reimport_performance_no_async_with_product_grading(self self.system_settings(enable_product_grade=True) self._import_reimport_performance( - expected_num_queries1=351, - expected_num_async_tasks1=11, - expected_num_queries2=306, - expected_num_async_tasks2=23, - expected_num_queries3=191, - expected_num_async_tasks3=21, + expected_num_queries1=347, + expected_num_async_tasks1=8, + expected_num_queries2=295, + expected_num_async_tasks2=19, + expected_num_queries3=182, + expected_num_async_tasks3=18, ) @override_settings(ENABLE_AUDITLOG=True, AUDITLOG_TYPE="django-pghistory") @@ -290,12 +290,12 @@ def test_import_reimport_reimport_performance_pghistory_no_async_with_product_gr self.system_settings(enable_product_grade=True) self._import_reimport_performance( - expected_num_queries1=317, - expected_num_async_tasks1=11, - expected_num_queries2=299, - expected_num_async_tasks2=23, - expected_num_queries3=186, - expected_num_async_tasks3=21, + expected_num_queries1=313, + expected_num_async_tasks1=8, + expected_num_queries2=288, + expected_num_async_tasks2=19, + expected_num_queries3=177, + expected_num_async_tasks3=18, ) # Deduplication is enabled in the tests above, but to properly test it we must run the same import twice and capture the results. @@ -414,9 +414,9 @@ def test_deduplication_performance_async(self): self._deduplication_performance( expected_num_queries1=311, - expected_num_async_tasks1=12, + expected_num_async_tasks1=8, expected_num_queries2=204, - expected_num_async_tasks2=12, + expected_num_async_tasks2=8, check_duplicates=False, # Async mode - deduplication happens later ) @@ -431,9 +431,9 @@ def test_deduplication_performance_pghistory_async(self): self._deduplication_performance( expected_num_queries1=275, - expected_num_async_tasks1=12, + expected_num_async_tasks1=8, expected_num_queries2=185, - expected_num_async_tasks2=12, + expected_num_async_tasks2=8, check_duplicates=False, # Async mode - deduplication happens later ) @@ -451,10 +451,10 @@ def test_deduplication_performance_no_async(self): testuser.usercontactinfo.save() self._deduplication_performance( - expected_num_queries1=323, - expected_num_async_tasks1=12, - expected_num_queries2=318, - expected_num_async_tasks2=12, + expected_num_queries1=316, + expected_num_async_tasks1=7, + expected_num_queries2=287, + expected_num_async_tasks2=7, ) @override_settings(ENABLE_AUDITLOG=True, AUDITLOG_TYPE="django-pghistory") @@ -471,8 +471,8 @@ def test_deduplication_performance_pghistory_no_async(self): testuser.usercontactinfo.save() self._deduplication_performance( - expected_num_queries1=287, - expected_num_async_tasks1=12, - expected_num_queries2=281, - expected_num_async_tasks2=12, + expected_num_queries1=280, + expected_num_async_tasks1=7, + expected_num_queries2=250, + expected_num_async_tasks2=7, ) diff --git a/unittests/test_utils_deduplication_reopen.py b/unittests/test_utils_deduplication_reopen.py index a7e72ede118..2981222d591 100644 --- a/unittests/test_utils_deduplication_reopen.py +++ b/unittests/test_utils_deduplication_reopen.py @@ -1,9 +1,9 @@ import datetime import logging +from dojo.finding.deduplication import set_duplicate from dojo.management.commands.fix_loop_duplicates import fix_loop_duplicates from dojo.models import Finding, copy_model_util -from dojo.utils import set_duplicate from .dojo_test_case import DojoTestCase