From e68d65da5da85f3d3f051e5b73d97bdaba09d360 Mon Sep 17 00:00:00 2001 From: Alexander Bilz Date: Sat, 20 Jan 2024 16:58:31 +0100 Subject: [PATCH] Revert "fix: restore parser capability for MS Teams v1" This reverts commit 43339324f72a9935edffdc6391e798b4135bb8d3. --- src/forensicsim/parser.py | 134 +++++++++++--------------------------- 1 file changed, 39 insertions(+), 95 deletions(-) diff --git a/src/forensicsim/parser.py b/src/forensicsim/parser.py index 9664e5a..5c2c3dd 100644 --- a/src/forensicsim/parser.py +++ b/src/forensicsim/parser.py @@ -1,11 +1,10 @@ import json -import warnings from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import Any, Optional, Union +from typing import Any, Optional -from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning +from bs4 import BeautifulSoup from dataclasses_json import ( DataClassJsonMixin, LetterCase, @@ -15,17 +14,14 @@ from forensicsim.backend import parse_db, write_results_to_json -# Suppress Beautiful Soup warnings -warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning) - -def strip_html_tags(value: str) -> str: +def strip_html_tags(value): # Get the text of any embedded html, such as divs, a href links soup = BeautifulSoup(value, features="html.parser") return soup.get_text() -def decode_dict(properties: Union[bytes, str, dict]) -> dict[str, Any]: +def decode_dict(properties): if isinstance(properties, bytes): soup = BeautifulSoup(properties, features="html.parser") properties = properties.decode(soup.original_encoding) @@ -39,11 +35,11 @@ def decode_dict(properties: Union[bytes, str, dict]) -> dict[str, Any]: return json.loads(properties, strict=False) -def decode_timestamp(content_utf8_encoded: str) -> datetime: +def decode_timestamp(content_utf8_encoded) -> datetime: return datetime.utcfromtimestamp(int(content_utf8_encoded) / 1000) -def encode_timestamp(timestamp: Optional[datetime]) -> Optional[str]: +def encode_timestamp(timestamp) -> Optional[str]: if timestamp is not None: return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f") return None @@ -72,17 +68,13 @@ class Meeting(DataClassJsonMixin): default="meeting", metadata=config(field_name="record_type") ) - def __eq__(self, other: object) -> bool: - if not isinstance(other, Meeting): - return NotImplemented - return self.cached_deduplication_key == other.cached_deduplication_key + def __eq__(self, other): + return self.cached_deduplication_key == other.cachedDeduplicationKey - def __hash__(self) -> int: + def __hash__(self): return hash(self.cached_deduplication_key) - def __lt__(self, other: object) -> bool: - if not isinstance(other, Meeting): - return NotImplemented + def __lt__(self, other): return self.cached_deduplication_key < other.cached_deduplication_key @@ -108,7 +100,7 @@ class Message(DataClassJsonMixin): is_from_me: Optional[bool] = None message_kind: Optional[str] = None messagetype: Optional[str] = None - original_arrival_time: Optional[str] = None + originalarrivaltime: Optional[str] = None properties: dict[str, Any] = field( default_factory=dict, metadata=config(decoder=decode_dict) ) @@ -124,28 +116,23 @@ class Message(DataClassJsonMixin): default="message", metadata=config(field_name="record_type") ) - def __post_init__(self) -> None: + def __post_init__(self): if self.cached_deduplication_key is None: self.cached_deduplication_key = str(self.creator) + str( self.clientmessageid ) - # change record type depending on properties if "call-log" in self.properties: self.record_type = "call" if "activity" in self.properties: self.record_type = "reaction" - def __eq__(self, other: object) -> bool: - if not isinstance(other, Message): - return NotImplemented + def __eq__(self, other): return self.cached_deduplication_key == other.cached_deduplication_key - def __hash__(self) -> int: + def __hash__(self): return hash(self.cached_deduplication_key) - def __lt__(self, other: object) -> bool: - if not isinstance(other, Message): - return NotImplemented + def __lt__(self, other): return self.cached_deduplication_key < other.cached_deduplication_key @@ -165,26 +152,21 @@ class Contact(DataClassJsonMixin): default="contact", metadata=config(field_name="record_type") ) - def __eq__(self, other: object) -> bool: - if not isinstance(other, Contact): - return NotImplemented + def __eq__(self, other): return self.mri == other.mri - def __hash__(self) -> int: + def __hash__(self): return hash(self.mri) - def __lt__(self, other: object) -> bool: - if not isinstance(other, Contact): - return NotImplemented + def __lt__(self, other): return self.mri < other.mri def _parse_people(people: list[dict]) -> set[Contact]: parsed_people = set() for p in people: - p |= p.get("value", {}) p |= {"origin_file": p.get("origin_file")} - + p |= p.get("value", {}) parsed_people.add(Contact.from_dict(p)) return parsed_people @@ -198,68 +180,33 @@ def _parse_buddies(buddies: list[dict]) -> set[Contact]: parsed_buddies.add(Contact.from_dict(b_of_b)) return parsed_buddies -# Conversations can contain multiple artefacts -# -> If type:Meeting then its a meeting + def _parse_conversations(conversations: list[dict]) -> set[Meeting]: cleaned_conversations = set() for c in conversations: - if c.get("value", {}).get("type", "") == "Meeting" and "meeting" in c.get( - "value", {} - ).get("threadProperties", {}): - c_value = c.get("value", {}) - c |= c_value - c |= {"thread_properties": c_value.get("threadProperties", {})} - c |= {"cached_deduplication_key": c.get("id")} + last_message = c.get("value", {}).get("lastMessage", {}) + + c |= { + "cachedDeduplicationKey": last_message.get("cachedDeduplicationKey"), + } + + if c.get("type", "") == "Meeting" and "meeting" in c.get( + "threadProperties", {} + ): cleaned_conversations.add(Meeting.from_dict(c)) + return cleaned_conversations def _parse_reply_chains(reply_chains: list[dict]) -> set[Message]: cleaned_reply_chains = set() - for rc in reply_chains: - - - # Skip empty records - if rc["value"] is None: - continue - - rc |= rc.get("value", {}) - rc |= {"origin_file": rc.get("origin_file")} - - message_dict = {} - if rc.get("value", {}).get("messageMap", {}) or rc.get("value", {}).get( - "messages", {} - ): - if rc.get("value", {}).get("messageMap", {}): - message_dict = rc.get("value", {}).get("messageMap", {}) - else: - message_dict = rc.get("value", {}).get("messages", {}) - - for k in message_dict: - md = message_dict[k] - - if ( - md.get("messagetype", "") == "RichText/Html" - or md.get("messagetype", "") == "Text" - ): - rc |= {"cached_deduplication_key": md.get("cachedDeduplicationKey")} - rc |= {"clientmessageid": md.get("clientmessageid")} - rc |= {"composetime": md.get("composetime")} - rc |= {"conversation_id": md.get("conversationId")} - rc |= {"content": md.get("content")} - rc |= {"contenttype": md.get("contenttype")} - rc |= {"created_time": md.get("createdTime")} - rc |= {"creator": md.get("creator")} - rc |= {"is_from_me": md.get("isFromMe")} - rc |= {"messagetype": md.get("messagetype")} - rc |= {"messageKind": md.get("messageKind")} - rc |= {"client_arrival_time": md.get("clientArrivalTime")} - rc |= {"original_arrival_time": md.get("originalarrivaltime")} - rc |= {"version": md.get("version")} - rc |= {"properties": md.get("properties")} - - cleaned_reply_chains.add(Message.from_dict(rc)) + for rc in reply_chains: + for message_values in rc.get("value", {}).get("messages", {}).values(): + message_values |= { + "origin_file": rc.get("origin_file"), + } + cleaned_reply_chains.add(Message.from_dict(message_values)) return cleaned_reply_chains @@ -287,13 +234,10 @@ def parse_records(records: list[dict]) -> list[dict]: return [r.to_dict() for r in parsed_records] -def process_db(input_path: Path, output_path: Path, blob_path: Path=None, do_not_filter: bool = True) -> None: +def process_db(input_path: Path, output_path: Path): if not input_path.parts[-1].endswith(".leveldb"): raise ValueError(f"Expected a leveldb folder. Path: {input_path}") - if blob_path is not None and not blob_path.parts[-1].endswith(".blob"): - raise ValueError(f"Expected a .blob folder. Path: {blob_path}") - - extracted_values = parse_db(input_path, blob_path, do_not_filter) + extracted_values = parse_db(input_path) parsed_records = parse_records(extracted_values) - write_results_to_json(parsed_records, output_path) \ No newline at end of file + write_results_to_json(parsed_records, output_path)