Skip to content

Commit

Permalink
Revert "fix: restore parser capability for MS Teams v1"
Browse files Browse the repository at this point in the history
This reverts commit 4333932.
  • Loading branch information
lxndrblz committed Jan 20, 2024
1 parent 4333932 commit e68d65d
Showing 1 changed file with 39 additions and 95 deletions.
134 changes: 39 additions & 95 deletions src/forensicsim/parser.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import json
import warnings
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Optional, Union
from typing import Any, Optional

from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning
from bs4 import BeautifulSoup
from dataclasses_json import (
DataClassJsonMixin,
LetterCase,
Expand All @@ -15,17 +14,14 @@

from forensicsim.backend import parse_db, write_results_to_json

# Suppress Beautiful Soup warnings
warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning)


def strip_html_tags(value: str) -> str:
def strip_html_tags(value):
# Get the text of any embedded html, such as divs, a href links
soup = BeautifulSoup(value, features="html.parser")
return soup.get_text()


def decode_dict(properties: Union[bytes, str, dict]) -> dict[str, Any]:
def decode_dict(properties):
if isinstance(properties, bytes):
soup = BeautifulSoup(properties, features="html.parser")
properties = properties.decode(soup.original_encoding)
Expand All @@ -39,11 +35,11 @@ def decode_dict(properties: Union[bytes, str, dict]) -> dict[str, Any]:
return json.loads(properties, strict=False)


def decode_timestamp(content_utf8_encoded: str) -> datetime:
def decode_timestamp(content_utf8_encoded) -> datetime:
return datetime.utcfromtimestamp(int(content_utf8_encoded) / 1000)


def encode_timestamp(timestamp: Optional[datetime]) -> Optional[str]:
def encode_timestamp(timestamp) -> Optional[str]:
if timestamp is not None:
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")
return None
Expand Down Expand Up @@ -72,17 +68,13 @@ class Meeting(DataClassJsonMixin):
default="meeting", metadata=config(field_name="record_type")
)

def __eq__(self, other: object) -> bool:
if not isinstance(other, Meeting):
return NotImplemented
return self.cached_deduplication_key == other.cached_deduplication_key
def __eq__(self, other):
return self.cached_deduplication_key == other.cachedDeduplicationKey

def __hash__(self) -> int:
def __hash__(self):
return hash(self.cached_deduplication_key)

def __lt__(self, other: object) -> bool:
if not isinstance(other, Meeting):
return NotImplemented
def __lt__(self, other):
return self.cached_deduplication_key < other.cached_deduplication_key


Expand All @@ -108,7 +100,7 @@ class Message(DataClassJsonMixin):
is_from_me: Optional[bool] = None
message_kind: Optional[str] = None
messagetype: Optional[str] = None
original_arrival_time: Optional[str] = None
originalarrivaltime: Optional[str] = None
properties: dict[str, Any] = field(
default_factory=dict, metadata=config(decoder=decode_dict)
)
Expand All @@ -124,28 +116,23 @@ class Message(DataClassJsonMixin):
default="message", metadata=config(field_name="record_type")
)

def __post_init__(self) -> None:
def __post_init__(self):
if self.cached_deduplication_key is None:
self.cached_deduplication_key = str(self.creator) + str(
self.clientmessageid
)
# change record type depending on properties
if "call-log" in self.properties:
self.record_type = "call"
if "activity" in self.properties:
self.record_type = "reaction"

def __eq__(self, other: object) -> bool:
if not isinstance(other, Message):
return NotImplemented
def __eq__(self, other):
return self.cached_deduplication_key == other.cached_deduplication_key

def __hash__(self) -> int:
def __hash__(self):
return hash(self.cached_deduplication_key)

def __lt__(self, other: object) -> bool:
if not isinstance(other, Message):
return NotImplemented
def __lt__(self, other):
return self.cached_deduplication_key < other.cached_deduplication_key


Expand All @@ -165,26 +152,21 @@ class Contact(DataClassJsonMixin):
default="contact", metadata=config(field_name="record_type")
)

def __eq__(self, other: object) -> bool:
if not isinstance(other, Contact):
return NotImplemented
def __eq__(self, other):
return self.mri == other.mri

def __hash__(self) -> int:
def __hash__(self):
return hash(self.mri)

def __lt__(self, other: object) -> bool:
if not isinstance(other, Contact):
return NotImplemented
def __lt__(self, other):
return self.mri < other.mri


def _parse_people(people: list[dict]) -> set[Contact]:
parsed_people = set()
for p in people:
p |= p.get("value", {})
p |= {"origin_file": p.get("origin_file")}

p |= p.get("value", {})
parsed_people.add(Contact.from_dict(p))
return parsed_people

Expand All @@ -198,68 +180,33 @@ def _parse_buddies(buddies: list[dict]) -> set[Contact]:
parsed_buddies.add(Contact.from_dict(b_of_b))
return parsed_buddies

# Conversations can contain multiple artefacts
# -> If type:Meeting then its a meeting

def _parse_conversations(conversations: list[dict]) -> set[Meeting]:
cleaned_conversations = set()
for c in conversations:
if c.get("value", {}).get("type", "") == "Meeting" and "meeting" in c.get(
"value", {}
).get("threadProperties", {}):
c_value = c.get("value", {})
c |= c_value
c |= {"thread_properties": c_value.get("threadProperties", {})}
c |= {"cached_deduplication_key": c.get("id")}
last_message = c.get("value", {}).get("lastMessage", {})

c |= {
"cachedDeduplicationKey": last_message.get("cachedDeduplicationKey"),
}

if c.get("type", "") == "Meeting" and "meeting" in c.get(
"threadProperties", {}
):
cleaned_conversations.add(Meeting.from_dict(c))

return cleaned_conversations


def _parse_reply_chains(reply_chains: list[dict]) -> set[Message]:
cleaned_reply_chains = set()
for rc in reply_chains:


# Skip empty records
if rc["value"] is None:
continue

rc |= rc.get("value", {})
rc |= {"origin_file": rc.get("origin_file")}

message_dict = {}
if rc.get("value", {}).get("messageMap", {}) or rc.get("value", {}).get(
"messages", {}
):
if rc.get("value", {}).get("messageMap", {}):
message_dict = rc.get("value", {}).get("messageMap", {})
else:
message_dict = rc.get("value", {}).get("messages", {})

for k in message_dict:
md = message_dict[k]

if (
md.get("messagetype", "") == "RichText/Html"
or md.get("messagetype", "") == "Text"
):
rc |= {"cached_deduplication_key": md.get("cachedDeduplicationKey")}
rc |= {"clientmessageid": md.get("clientmessageid")}
rc |= {"composetime": md.get("composetime")}
rc |= {"conversation_id": md.get("conversationId")}
rc |= {"content": md.get("content")}
rc |= {"contenttype": md.get("contenttype")}
rc |= {"created_time": md.get("createdTime")}
rc |= {"creator": md.get("creator")}
rc |= {"is_from_me": md.get("isFromMe")}
rc |= {"messagetype": md.get("messagetype")}
rc |= {"messageKind": md.get("messageKind")}
rc |= {"client_arrival_time": md.get("clientArrivalTime")}
rc |= {"original_arrival_time": md.get("originalarrivaltime")}
rc |= {"version": md.get("version")}
rc |= {"properties": md.get("properties")}

cleaned_reply_chains.add(Message.from_dict(rc))

for rc in reply_chains:
for message_values in rc.get("value", {}).get("messages", {}).values():
message_values |= {
"origin_file": rc.get("origin_file"),
}
cleaned_reply_chains.add(Message.from_dict(message_values))
return cleaned_reply_chains


Expand Down Expand Up @@ -287,13 +234,10 @@ def parse_records(records: list[dict]) -> list[dict]:
return [r.to_dict() for r in parsed_records]


def process_db(input_path: Path, output_path: Path, blob_path: Path=None, do_not_filter: bool = True) -> None:
def process_db(input_path: Path, output_path: Path):
if not input_path.parts[-1].endswith(".leveldb"):
raise ValueError(f"Expected a leveldb folder. Path: {input_path}")

if blob_path is not None and not blob_path.parts[-1].endswith(".blob"):
raise ValueError(f"Expected a .blob folder. Path: {blob_path}")

extracted_values = parse_db(input_path, blob_path, do_not_filter)
extracted_values = parse_db(input_path)
parsed_records = parse_records(extracted_values)
write_results_to_json(parsed_records, output_path)
write_results_to_json(parsed_records, output_path)

0 comments on commit e68d65d

Please sign in to comment.