|
1 | 1 | import logging |
2 | 2 |
|
| 3 | +from django.conf import settings |
3 | 4 | from django.core.management.base import BaseCommand |
4 | 5 | from django.db.models import Prefetch |
5 | 6 |
|
@@ -116,37 +117,51 @@ def handle(self, *args, **options): |
116 | 117 |
|
117 | 118 | def _dedupe_batch_mode(self, findings_queryset, *, dedupe_sync: bool = True): |
118 | 119 | """ |
119 | | - Deduplicate findings in batches, grouped by test (similar to import process). |
| 120 | + Deduplicate findings in batches of max 1000 per test (similar to import process). |
120 | 121 | This is more efficient than processing findings one-by-one. |
121 | 122 | Can run synchronously or asynchronously. |
122 | 123 | """ |
123 | 124 | mode_str = "synchronous" if dedupe_sync else "asynchronous" |
124 | 125 | logger.info(f"######## Deduplicating in batch mode ({mode_str}) ########") |
125 | 126 |
|
126 | | - # Group findings by test_id to process them in batches |
127 | | - # We need to get test_ids first to group properly |
| 127 | + batch_max_size = getattr(settings, "IMPORT_REIMPORT_DEDUPE_BATCH_SIZE", 1000) |
| 128 | + total_findings = findings_queryset.count() |
| 129 | + logger.info(f"Processing {total_findings} findings in batches of max {batch_max_size} per test ({mode_str})") |
| 130 | + |
| 131 | + # Group findings by test_id to process them in batches per test |
128 | 132 | test_ids = findings_queryset.values_list("test_id", flat=True).distinct() |
129 | 133 | total_tests = len(test_ids) |
130 | | - logger.info(f"Processing {total_tests} tests in batch mode ({mode_str})") |
| 134 | + total_processed = 0 |
131 | 135 |
|
132 | | - processed_count = 0 |
133 | 136 | for test_id in test_ids: |
134 | 137 | # Get finding IDs for this test (exclude duplicates to avoid reprocessing) |
135 | 138 | test_finding_ids = list(findings_queryset.filter(test_id=test_id).exclude(duplicate=True).values_list("id", flat=True)) |
136 | 139 |
|
137 | | - if test_finding_ids: |
138 | | - if dedupe_sync: |
139 | | - # Synchronous: load findings and process immediately |
140 | | - test_findings = get_finding_models_for_deduplication(test_finding_ids) |
141 | | - logger.debug(f"Deduplicating batch of {len(test_findings)} findings for test {test_id}") |
142 | | - dedupe_batch_of_findings(test_findings) |
143 | | - else: |
144 | | - # Asynchronous: submit task with finding IDs |
145 | | - logger.debug(f"Submitting async batch task for {len(test_finding_ids)} findings for test {test_id}") |
146 | | - do_dedupe_batch_task(test_finding_ids) |
147 | | - |
148 | | - processed_count += 1 |
149 | | - if processed_count % 10 == 0: |
150 | | - logger.info(f"Processed {processed_count}/{total_tests} tests") |
151 | | - |
152 | | - logger.info(f"######## Completed batch deduplication for {processed_count} tests ({mode_str}) ########") |
| 140 | + if not test_finding_ids: |
| 141 | + continue |
| 142 | + |
| 143 | + # Process findings for this test in batches of max batch_max_size |
| 144 | + batch_finding_ids = [] |
| 145 | + for idx, finding_id in enumerate(test_finding_ids): |
| 146 | + is_final_finding_for_test = idx == len(test_finding_ids) - 1 |
| 147 | + batch_finding_ids.append(finding_id) |
| 148 | + |
| 149 | + # If batch is full or we're at the end of this test's findings, process the batch |
| 150 | + if len(batch_finding_ids) >= batch_max_size or is_final_finding_for_test: |
| 151 | + if dedupe_sync: |
| 152 | + # Synchronous: load findings and process immediately |
| 153 | + batch_findings = get_finding_models_for_deduplication(batch_finding_ids) |
| 154 | + logger.debug(f"Deduplicating batch of {len(batch_findings)} findings for test {test_id}") |
| 155 | + dedupe_batch_of_findings(batch_findings) |
| 156 | + else: |
| 157 | + # Asynchronous: submit task with finding IDs |
| 158 | + logger.debug(f"Submitting async batch task for {len(batch_finding_ids)} findings for test {test_id}") |
| 159 | + do_dedupe_batch_task(batch_finding_ids) |
| 160 | + |
| 161 | + total_processed += len(batch_finding_ids) |
| 162 | + batch_finding_ids = [] |
| 163 | + |
| 164 | + if total_processed % (batch_max_size * 10) == 0: |
| 165 | + logger.info(f"Processed {total_processed}/{total_findings} findings") |
| 166 | + |
| 167 | + logger.info(f"######## Completed batch deduplication for {total_processed} findings across {total_tests} tests ({mode_str}) ########") |
0 commit comments