diff --git a/turbinia/evidence.py b/turbinia/evidence.py index fbe605c9f..4e1391897 100644 --- a/turbinia/evidence.py +++ b/turbinia/evidence.py @@ -67,9 +67,10 @@ def evidence_class_names(all_classes=False): # Ignore classes that are not real Evidence types and the base class. ignored_classes = ( 'BinaryExtraction', 'BulkExtractorOutput', 'Evidence', 'EvidenceState', - 'EvidenceCollection', 'ExportedFileArtifact', 'FilteredTextFile', - 'FinalReport', 'IntEnum', 'PlasoCsvFile', 'PhotorecOutput', - 'ReportText', 'TextFile', 'VolatilityReport', 'TurbiniaException') + 'EvidenceCollection', 'ExportedFileArtifact', 'ExportedFileArtifactLLM', + 'FilteredTextFile', 'FinalReport', 'IntEnum', 'PlasoCsvFile', + 'PhotorecOutput', 'ReportText', 'TextFile', 'VolatilityReport', + 'TurbiniaException') class_names = filter( lambda class_tuple: class_tuple[0] not in ignored_classes, class_names) return list(class_names) @@ -1147,6 +1148,18 @@ def __init__(self, artifact_name=None, *args, **kwargs): self.copyable = True +class ExportedFileArtifactLLM(Evidence): + """Exported file artifact.""" + + REQUIRED_ATTRIBUTES = ['artifact_name'] + + def __init__(self, artifact_name=None, *args, **kwargs): + """Initializes an exported file artifact.""" + super(ExportedFileArtifactLLM, self).__init__(*args, **kwargs) + self.artifact_name = artifact_name + self.copyable = True + + class VolatilityReport(TextFile): """Volatility output file data.""" pass diff --git a/turbinia/jobs/llm_artifacts_analyzer.py b/turbinia/jobs/llm_artifacts_analyzer.py index e24378b56..6a0f3c08c 100644 --- a/turbinia/jobs/llm_artifacts_analyzer.py +++ b/turbinia/jobs/llm_artifacts_analyzer.py @@ -104,9 +104,13 @@ def create_tasks(self, evidence): """ tasks = [] for artifact_name in LLM_ARTIFACTS: + # To avoid redundent processing between LLM analyzer and other + # analyzers using same evidence type. LLM analyzer uses evidence + # type `ExportedFileArtifactLLM` supported by + # FileArtifactExtractionTask when llm_artifact=True. tasks.extend([ - workers.artifact.FileArtifactExtractionTask(artifact_name) - for _ in evidence + workers.artifact.FileArtifactExtractionTask( + artifact_name=artifact_name, llm_artifact=True) for _ in evidence ]) return tasks @@ -114,7 +118,11 @@ def create_tasks(self, evidence): class LLMAnalysisJob(interface.TurbiniaJob): """LLM analysis job for selected history, logs and config files.""" - evidence_input = [evidence_module.ExportedFileArtifact] + # To avoid redundent processing between LLM analyzer and other + # analyzers using same evidence type. LLM analyzer uses seperate + # evidence type supported by FileArtifactExtractionTask when + # llm_artifact=True. + evidence_input = [evidence_module.ExportedFileArtifactLLM] evidence_output = [evidence_module.ReportText] NAME = 'LLMAnalysisJob' @@ -128,9 +136,7 @@ def create_tasks(self, evidence): Returns: A list of tasks to schedule. """ - evidence = [ - e for e in list(set(evidence)) if e.artifact_name in LLM_ARTIFACTS - ] + evidence = [e for e in evidence if e.artifact_name in LLM_ARTIFACTS] return [llm_analyzer_module.LLMAnalyzerTask() for _ in evidence] diff --git a/turbinia/workers/analysis/llm_analyzer.py b/turbinia/workers/analysis/llm_analyzer.py index 25d6caf92..3a6a241c7 100644 --- a/turbinia/workers/analysis/llm_analyzer.py +++ b/turbinia/workers/analysis/llm_analyzer.py @@ -64,8 +64,10 @@ "**Artifact Content (Part {i} of {chunks_len}):** \n```\n{chunk}\n```" """ PRIORITY_PROMPT = """ -Please set the findings priority, the answer can only be one of: -[LOW, MEDIUM, HIGH, CRITICAL] +Please set the findings priority, your answer must be a single word from the following list: [LOW, MEDIUM, HIGH, CRITICAL] + +**Examples answer:** +CRITICAL """ SUMMARY_PROMPT = """ Please summarize all findings in a single statement, keep summary short and don't describe the summary @@ -75,7 +77,7 @@ class LLMAnalyzerTask(workers.TurbiniaTask): """LLM analysis task for selected history, logs and config files.""" - # Input Evidence is ExportedFileArtifact so does not need to be preprocessed. + # Input Evidence ExportedFileArtifactLLM does not need to be preprocessed. REQUIRED_STATES = [] def run(self, evidence, result): @@ -91,7 +93,7 @@ def run(self, evidence, result): result.log(f"Running LLMAnalyzerTask task on {evidence.artifact_name}") # Where to store the resulting output file. - output_file_name = "llm_analysis.txt" + output_file_name = f"{evidence.artifact_name}-llm_analysis.txt" output_file_path = os.path.join(self.output_dir, output_file_name) result.log(f"LLMAnalyzerTask output_file_path {output_file_path}") # Set the output file as the data source for the output evidence. @@ -157,19 +159,21 @@ def llm_analyze_artifact(self, artifact_content, artifact_name): # Send 'prompt' to your Gemini-1.0-pro model (chunk_report, history_session) = client.prompt_with_history( content_prompt_chunk, history_session) - report += chunk_report if not report else "\n" + chunk_report + report += ( + chunk_report.rstrip().strip() if not report else "\n" + + chunk_report.rstrip().strip()) (priority, history_session) = client.prompt_with_history( PRIORITY_PROMPT, history_session) (summary, _) = client.prompt_with_history(SUMMARY_PROMPT, history_session) - if priority == "MEDIUM": - priority = workers.Priority.MEDIUM - elif priority == "HIGH": - priority = workers.Priority.HIGH - elif priority == "CRITICAL": + if "CRITICAL" in priority.upper(): priority = workers.Priority.CRITICAL + elif "HIGH" in priority.upper(): + priority = workers.Priority.HIGH + elif "MEDIUM" in priority.upper(): + priority = workers.Priority.MEDIUM else: priority = workers.Priority.LOW - return (report, priority, summary) + return (report.rstrip().strip(), priority, summary.replace("\n", "")) def split_into_chunks(self, text, max_size): """Splits text into chunks respecting token limits.""" diff --git a/turbinia/workers/analysis/llm_analyzer_test.py b/turbinia/workers/analysis/llm_analyzer_test.py index bc2967a0b..47ea31ca4 100644 --- a/turbinia/workers/analysis/llm_analyzer_test.py +++ b/turbinia/workers/analysis/llm_analyzer_test.py @@ -48,7 +48,7 @@ def test_llm_analyze_artifact(self, mock_gen_model, mock_gen_config): chat_instance.send_message.assert_called_with( "\nPlease summarize all findings in a single statement, keep summary" " short and don't describe the summary\n") - self.assertEqual(report, self.BAD_CONFIG_REPORT) + self.assertEqual(report, self.BAD_CONFIG_REPORT.rstrip().strip()) self.assertEqual(priority, workers.Priority.CRITICAL) self.assertEqual(summary, self.BAD_CONFIG_SUMMARY) diff --git a/turbinia/workers/artifact.py b/turbinia/workers/artifact.py index 453bb410d..c964ce13d 100644 --- a/turbinia/workers/artifact.py +++ b/turbinia/workers/artifact.py @@ -19,7 +19,7 @@ import os from turbinia import config -from turbinia.evidence import ExportedFileArtifact +from turbinia import evidence as evidence_module from turbinia.evidence import EvidenceState as state from turbinia.workers import TurbiniaTask @@ -29,10 +29,11 @@ class FileArtifactExtractionTask(TurbiniaTask): REQUIRED_STATES = [state.ATTACHED, state.CONTAINER_MOUNTED] - def __init__(self, artifact_name='FileArtifact'): + def __init__(self, artifact_name='FileArtifact', llm_artifact=False): super(FileArtifactExtractionTask, self).__init__() self.artifact_name = artifact_name self.job_name = "FileArtifactExtractionJob" + self.llm_artifact = llm_artifact def run(self, evidence, result): """Extracts artifacts using Plaso image_export.py. @@ -94,9 +95,13 @@ def run(self, evidence, result): f'image_export.py failed for artifact {self.artifact_name:s}.') return result + artifact_type = getattr(evidence_module, 'ExportedFileArtifact') + if self.llm_artifact: + artifact_type = getattr(evidence_module, 'ExportedFileArtifactLLM') + for dirpath, _, filenames in os.walk(export_directory): for filename in filenames: - exported_artifact = ExportedFileArtifact( + exported_artifact = artifact_type( artifact_name=self.artifact_name, source_path=os.path.join( dirpath, filename)) result.log(f'Adding artifact {filename:s}')