Skip to content

Commit

Permalink
fix: restore parser capability for MS Teams v1
Browse files Browse the repository at this point in the history
  • Loading branch information
lxndrblz committed Jan 20, 2024
1 parent 241a740 commit 4333932
Showing 1 changed file with 95 additions and 39 deletions.
134 changes: 95 additions & 39 deletions src/forensicsim/parser.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
import warnings
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from typing import Any, Optional, Union

from bs4 import BeautifulSoup
from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
from dataclasses_json import (
DataClassJsonMixin,
LetterCase,
Expand All @@ -14,14 +15,17 @@

from forensicsim.backend import parse_db, write_results_to_json

# Suppress Beautiful Soup warnings
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)

def strip_html_tags(value):

def strip_html_tags(value: str) -> str:
# Get the text of any embedded html, such as divs, a href links
soup = BeautifulSoup(value, features="html.parser")
return soup.get_text()


def decode_dict(properties):
def decode_dict(properties: Union[bytes, str, dict]) -> dict[str, Any]:
if isinstance(properties, bytes):
soup = BeautifulSoup(properties, features="html.parser")
properties = properties.decode(soup.original_encoding)
Expand All @@ -35,11 +39,11 @@ def decode_dict(properties):
return json.loads(properties, strict=False)


def decode_timestamp(content_utf8_encoded) -> datetime:
def decode_timestamp(content_utf8_encoded: str) -> datetime:
return datetime.utcfromtimestamp(int(content_utf8_encoded) / 1000)


def encode_timestamp(timestamp) -> Optional[str]:
def encode_timestamp(timestamp: Optional[datetime]) -> Optional[str]:
if timestamp is not None:
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")
return None
Expand Down Expand Up @@ -68,13 +72,17 @@ class Meeting(DataClassJsonMixin):
default="meeting", metadata=config(field_name="record_type")
)

def __eq__(self, other):
return self.cached_deduplication_key == other.cachedDeduplicationKey
def __eq__(self, other: object) -> bool:
if not isinstance(other, Meeting):
return NotImplemented
return self.cached_deduplication_key == other.cached_deduplication_key

def __hash__(self):
def __hash__(self) -> int:
return hash(self.cached_deduplication_key)

def __lt__(self, other):
def __lt__(self, other: object) -> bool:
if not isinstance(other, Meeting):
return NotImplemented
return self.cached_deduplication_key < other.cached_deduplication_key


Expand All @@ -100,7 +108,7 @@ class Message(DataClassJsonMixin):
is_from_me: Optional[bool] = None
message_kind: Optional[str] = None
messagetype: Optional[str] = None
originalarrivaltime: Optional[str] = None
original_arrival_time: Optional[str] = None
properties: dict[str, Any] = field(
default_factory=dict, metadata=config(decoder=decode_dict)
)
Expand All @@ -116,23 +124,28 @@ class Message(DataClassJsonMixin):
default="message", metadata=config(field_name="record_type")
)

def __post_init__(self):
def __post_init__(self) -> None:
if self.cached_deduplication_key is None:
self.cached_deduplication_key = str(self.creator) + str(
self.clientmessageid
)
# change record type depending on properties
if "call-log" in self.properties:
self.record_type = "call"
if "activity" in self.properties:
self.record_type = "reaction"

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, Message):
return NotImplemented
return self.cached_deduplication_key == other.cached_deduplication_key

def __hash__(self):
def __hash__(self) -> int:
return hash(self.cached_deduplication_key)

def __lt__(self, other):
def __lt__(self, other: object) -> bool:
if not isinstance(other, Message):
return NotImplemented
return self.cached_deduplication_key < other.cached_deduplication_key


Expand All @@ -152,21 +165,26 @@ class Contact(DataClassJsonMixin):
default="contact", metadata=config(field_name="record_type")
)

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, Contact):
return NotImplemented
return self.mri == other.mri

def __hash__(self):
def __hash__(self) -> int:
return hash(self.mri)

def __lt__(self, other):
def __lt__(self, other: object) -> bool:
if not isinstance(other, Contact):
return NotImplemented
return self.mri < other.mri


def _parse_people(people: list[dict]) -> set[Contact]:
parsed_people = set()
for p in people:
p |= {"origin_file": p.get("origin_file")}
p |= p.get("value", {})
p |= {"origin_file": p.get("origin_file")}

parsed_people.add(Contact.from_dict(p))
return parsed_people

Expand All @@ -180,33 +198,68 @@ def _parse_buddies(buddies: list[dict]) -> set[Contact]:
parsed_buddies.add(Contact.from_dict(b_of_b))
return parsed_buddies


# Conversations can contain multiple artefacts
# -> If type:Meeting then its a meeting
def _parse_conversations(conversations: list[dict]) -> set[Meeting]:
cleaned_conversations = set()
for c in conversations:
last_message = c.get("value", {}).get("lastMessage", {})

c |= {
"cachedDeduplicationKey": last_message.get("cachedDeduplicationKey"),
}

if c.get("type", "") == "Meeting" and "meeting" in c.get(
"threadProperties", {}
):
if c.get("value", {}).get("type", "") == "Meeting" and "meeting" in c.get(
"value", {}
).get("threadProperties", {}):
c_value = c.get("value", {})
c |= c_value
c |= {"thread_properties": c_value.get("threadProperties", {})}
c |= {"cached_deduplication_key": c.get("id")}
cleaned_conversations.add(Meeting.from_dict(c))

return cleaned_conversations


def _parse_reply_chains(reply_chains: list[dict]) -> set[Message]:
cleaned_reply_chains = set()

for rc in reply_chains:
for message_values in rc.get("value", {}).get("messages", {}).values():
message_values |= {
"origin_file": rc.get("origin_file"),
}
cleaned_reply_chains.add(Message.from_dict(message_values))


# Skip empty records
if rc["value"] is None:
continue

rc |= rc.get("value", {})
rc |= {"origin_file": rc.get("origin_file")}

message_dict = {}
if rc.get("value", {}).get("messageMap", {}) or rc.get("value", {}).get(
"messages", {}
):
if rc.get("value", {}).get("messageMap", {}):
message_dict = rc.get("value", {}).get("messageMap", {})
else:
message_dict = rc.get("value", {}).get("messages", {})

for k in message_dict:
md = message_dict[k]

if (
md.get("messagetype", "") == "RichText/Html"
or md.get("messagetype", "") == "Text"
):
rc |= {"cached_deduplication_key": md.get("cachedDeduplicationKey")}
rc |= {"clientmessageid": md.get("clientmessageid")}
rc |= {"composetime": md.get("composetime")}
rc |= {"conversation_id": md.get("conversationId")}
rc |= {"content": md.get("content")}
rc |= {"contenttype": md.get("contenttype")}
rc |= {"created_time": md.get("createdTime")}
rc |= {"creator": md.get("creator")}
rc |= {"is_from_me": md.get("isFromMe")}
rc |= {"messagetype": md.get("messagetype")}
rc |= {"messageKind": md.get("messageKind")}
rc |= {"client_arrival_time": md.get("clientArrivalTime")}
rc |= {"original_arrival_time": md.get("originalarrivaltime")}
rc |= {"version": md.get("version")}
rc |= {"properties": md.get("properties")}

cleaned_reply_chains.add(Message.from_dict(rc))

return cleaned_reply_chains


Expand Down Expand Up @@ -234,10 +287,13 @@ def parse_records(records: list[dict]) -> list[dict]:
return [r.to_dict() for r in parsed_records]


def process_db(input_path: Path, output_path: Path):
def process_db(input_path: Path, output_path: Path, blob_path: Path=None, do_not_filter: bool = True) -> None:

Check failure on line 290 in src/forensicsim/parser.py

View workflow job for this annotation

GitHub Actions / Build exectuable 📦

Ruff (RUF013)

src\forensicsim\parser.py:290:64: RUF013 PEP 484 prohibits implicit `Optional`
if not input_path.parts[-1].endswith(".leveldb"):
raise ValueError(f"Expected a leveldb folder. Path: {input_path}")

extracted_values = parse_db(input_path)
if blob_path is not None and not blob_path.parts[-1].endswith(".blob"):
raise ValueError(f"Expected a .blob folder. Path: {blob_path}")

extracted_values = parse_db(input_path, blob_path, do_not_filter)
parsed_records = parse_records(extracted_values)
write_results_to_json(parsed_records, output_path)
write_results_to_json(parsed_records, output_path)

0 comments on commit 4333932

Please sign in to comment.