-
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
27787af
commit de9d855
Showing
9 changed files
with
436 additions
and
189 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,191 +1,130 @@ | ||
"""Run with 64-bit python! Respawn .bsp files are large!""" | ||
import difflib | ||
import io | ||
import itertools | ||
import re | ||
from typing import Dict, Iterable, List | ||
|
||
|
||
r1_dir = "E:/Mod/Titanfall/maps" | ||
r1o_dir = "E:/Mod/TitanfallOnline/maps" | ||
r2_dir = "E:/Mod/Titanfall2/maps" | ||
|
||
shared_maps = [("mp_angel_city", "mp_angel_city"), | ||
("mp_colony", "mp_colony02"), | ||
("mp_relic", "mp_relic02"), | ||
("mp_rise", "mp_rise"), | ||
("mp_wargames", "mp_wargames")] | ||
# ^ r1 map name, r2 map name | ||
|
||
|
||
def diff_bsps(bsp1, bsp2, full=False) -> str: | ||
"""WARNING: full diffs can be incredibly large!""" | ||
out = [] | ||
if bsp1.folder == bsp2.folder: | ||
out.append(f"Comparing {bsp1} -> {bsp2}...") | ||
else: | ||
out.append(f"Comparing {bsp1.folder}/{bsp1} -> {bsp2.folder}/{bsp2}...") | ||
# NOTE: comparing lumps by index, same number of lumps expected | ||
for lump1, lump2 in zip(bsp1.branch.LUMP, bsp2.branch.LUMP): | ||
lump1 = lump1.name | ||
lump2 = lump2.name | ||
# diff headers | ||
if lump1 not in bsp1.headers or lump2 not in bsp2.headers: | ||
continue # lazy fix for rbsp externals | ||
# TODO: note absent headers (not just for respawn.ExternalLumpManager!) | ||
bsp1_header = bsp1.headers[lump1] | ||
bsp2_header = bsp2.headers[lump2] | ||
lump_name = lump1 if lump1 == lump2 else f"{lump1} -> {lump2}" | ||
# NOTE: fourCC (decompressed size) vs length is not calculated | ||
# -- in fact, no check to check opposing compressed state (one compressed, one uncompressed) | ||
# -- however, LZMA compressed lump contents are always decompressed before comparison | ||
header_diff = "".join(["Y" if bsp1_header.offset == bsp2_header.offset else "N", | ||
"Y" if bsp1_header.length == bsp2_header.length else "N", | ||
"Y" if bsp1_header.version == bsp2_header.version else "N", | ||
"Y" if bsp1_header.fourCC == bsp2_header.fourCC else "N"]) | ||
# diff lump contents | ||
try: | ||
lump_1_contents = bsp1.lump_as_bytes(lump1) | ||
lump_2_contents = bsp2.lump_as_bytes(lump2) | ||
except Exception as exc: | ||
out.append(f"{lump_name} {header_diff} ???? {exc}") | ||
continue # skip this lump | ||
lumps_match = bool(lump_1_contents == lump_2_contents) | ||
contents_diff = "YES!" if lumps_match else "NOPE" | ||
out.append(f"{lump_name} {header_diff} {contents_diff}") | ||
# was a lump removed / added? | ||
if (len(lump_1_contents) == 0 or len(lump_2_contents) == 0) and not lumps_match: | ||
out.append(" ".join(["+" if hasattr(bsp1, lump1) else "-", f"{bsp1.filename}.{lump1}"])) | ||
out.append(" ".join(["+" if hasattr(bsp2, lump2) else "-", f"{bsp2.filename}.{lump2}"])) | ||
# detailed comparisons | ||
elif full: | ||
if not lumps_match: | ||
# TODO: measure the scale of the differences | ||
if lump1 in bsp1.branch.LUMP_CLASSES and lump2 in bsp2.branch.LUMP_CLASSES: | ||
diff = difflib.unified_diff([lc.__repr__() for lc in getattr(bsp1, lump1)], | ||
[lc.__repr__() for lc in getattr(bsp2, lump2)], | ||
f"{bsp1.filename}.{lump1}", | ||
f"{bsp1.filename}.{lump1}") | ||
out.extend(diff) | ||
# SPECIAL_LUMP_CLASSES | ||
elif all([ln == "ENTITIES" for ln in (lump1, lump2)]): | ||
out.append(diff_entities(bsp1.ENTITIES, bsp2.ENTITIES)) | ||
elif all([ln == "PAKFILE" for ln in (lump1, lump2)]): | ||
# NOTE: this will fail on nexon.cso2 bsps, as their pakfiles are unmapped | ||
out.append(diff_pakfiles(bsp1, bsp2)) | ||
# TODO: GAME_LUMP diff model_names | ||
else: # BASIC_LUMP_CLASSES / general raw bytes | ||
# NOTE: xxd line numbers prevent accurately tracing insertions | ||
# TODO: set xxd width to cover LumpClass._format, 1 entry per line | ||
# -- if formats don't match, give an option to skip | ||
diff = difflib.context_diff(xxd(io.BytesIO(lump_1_contents)), | ||
xxd(io.BytesIO(lump_2_contents)), | ||
f"{bsp1.filename}.{lump1}", | ||
f"{bsp2.filename}.{lump2}") | ||
# TODO: run xxd without creating line numbers | ||
# -- then, generate line numbers from diff & update diff with these line numbers | ||
out.extend(diff) | ||
from typing import Any, Dict, Generator, List | ||
|
||
from . import base | ||
from . import shared | ||
|
||
from bsp_tool import branches | ||
from bsp_tool.base import Bsp | ||
from bsp_tool.lumps import BasicBspLump, RawBspLump, ExternalRawBspLump | ||
|
||
|
||
def diff_lumps(old_lump: Any, new_lump: Any) -> base.Diff: | ||
LumpClasses = set() | ||
for lump in (old_lump, new_lump): | ||
if issubclass(lump.__class__, BasicBspLump): | ||
LumpClasses.add(lump.LumpClass) | ||
else: # SpecialLumpClass / RawBspLump | ||
LumpClasses.add(lump.__class__) | ||
# match LumpClasses to a base.Diff subclass | ||
# TODO: mismatched lump type diffs (substitute defaults for alternate versions?) | ||
# -- should only be used for extremely similar lumps | ||
if len(LumpClasses) > 1: | ||
# AbridgedDiff? | ||
raise NotImplementedError("Cannot diff lumps of differring LumpClass") | ||
if LumpClasses == {branches.shared.Entities}: | ||
DiffClass = shared.EntitiesDiff | ||
elif LumpClasses == {branches.shared.PakFile}: | ||
DiffClass = shared.PakFileDiff | ||
elif RawBspLump in LumpClasses or ExternalRawBspLump in LumpClasses: | ||
# TODO: core.xxd diff | ||
raise NotImplementedError("Cannot diff raw lumps") | ||
# if all([issubclass(lc, branches.base.BitField) for lc in LumpClasses]): | ||
# DiffClass = base.BitFieldDiff | ||
# if all([issubclass(lc, branches.base.MappedArray) for lc in LumpClasses]): | ||
# DiffClass = base.MappedArrayDiff | ||
# if all([issubclass(lc, branches.base.Struct) for lc in LumpClasses]): | ||
# DiffClass = base.StructDiff | ||
else: # default | ||
DiffClass = base.Diff | ||
return DiffClass(old_lump, new_lump) | ||
|
||
|
||
class BspDiff: | ||
"""deferred diffs of lumps & headers etc.""" | ||
old: Bsp | ||
new: Bsp | ||
|
||
def __init__(self, old: Bsp, new: Bsp): | ||
if old.branch != new.branch: | ||
raise NotImplementedError("Cannot diff bsps from different branches") | ||
self.old = old | ||
self.new = new | ||
self.headers = HeadersDiff(old.headers, new.headers) | ||
# NOTE: a change in header offsets does not imply a change in lump data | ||
# TODO: other metadata (file magic, version, revision, signature etc.) | ||
|
||
def __getattr__(self, lump_name: str) -> Any: | ||
old_lump = getattr(self.old, lump_name, None) | ||
new_lump = getattr(self.new, lump_name, None) | ||
no_old_lump = old_lump is None | ||
no_new_lump = new_lump is None | ||
if no_old_lump and no_new_lump: | ||
raise AttributeError(f"Neither bsp has {lump_name} lump to be diffed") | ||
elif no_old_lump or no_new_lump: | ||
return NoneDiff(old_lump, new_lump) | ||
else: | ||
out.extend([str(bsp1_header), str(bsp2_header)]) | ||
return "\n".join(out) | ||
|
||
|
||
def diff_rbsps(rbsp1, rbsp2, external=True, full=False) -> str: | ||
"""compare internal to external lumps with diff_rbsps(bsp, bsp.external, external=False)""" | ||
out = ["*** .bsp files ***", diff_bsps(rbsp1, rbsp2, full)] | ||
# NOTE: could confirm ent_types against ENTITY_PARTITION lump | ||
# -- however respawn seems to always use every .ent, leaving the script file empty if unused | ||
# -- this makes ENTITY_PARTITION practically useless, as it never changes | ||
out.append("*** .ent files ***") | ||
for ent_type in ("env", "fx", "script", "snd", "spawn"): | ||
ent_lump = f"ENTITIES_{ent_type}" | ||
lump1 = getattr(rbsp1, ent_lump, list()) | ||
lump2 = getattr(rbsp2, ent_lump, list()) | ||
ents_match = "YES!" if lump1 == lump2 else "NOPE" | ||
out.append(f"{ent_lump} {ents_match}") | ||
if full and ents_match == "NOPE": | ||
out.append(diff_entities(lump1, lump2)) | ||
if external: | ||
out.append("*** .bsp_lump files ***") | ||
out.append(diff_bsps(rbsp1.external, rbsp2.external, full)) | ||
# TODO: close each lump after reading to save memory & avoid the "Too many open files" OSError | ||
return "\n".join(out) | ||
|
||
|
||
EntityLump = List[Dict[str, str]] | ||
# ^ [{"key": "value"}] | ||
|
||
|
||
def diff_entities(lump1: EntityLump, lump2: EntityLump) -> str: | ||
out = [] | ||
for i, e1, e2 in zip(itertools.count(), lump1, lump2): | ||
if e1 != e2: | ||
out.extend([f"Entity #{i}", " {"]) | ||
# TODO: be a little dynamic to make sure keys align | ||
# -- otherwise many false negatives might appear in a relatively simple diff | ||
for k1, k2, v1, v2 in zip(e1.keys(), e2.keys(), e1.values(), e2.values()): | ||
if v1 != v2: | ||
out.extend([f'- "{k1}" "{v1}"', | ||
f'+ "{k2}" "{v2}"']) | ||
else: | ||
out.append(f' "{k1}" "{v1}"') | ||
out.append(" }") | ||
return "\n".join(out) | ||
|
||
|
||
def diff_pakfiles(bsp1, bsp2) -> str: | ||
"""Works on any ValveBsp based .bsp (except CS:O2)""" | ||
out = [] | ||
pak1_files = bsp1.PAKFILE.namelist() | ||
pak2_files = bsp2.PAKFILE.namelist() | ||
for filename in pak1_files: | ||
absent = filename not in pak2_files | ||
out.append(f"- {filename}" if absent else f" {filename}") | ||
if not absent: | ||
file1 = bsp1.PAKFILE.read(filename) | ||
file2 = bsp2.PAKFILE.read(filename) | ||
if file1 == file2: # skip matches | ||
continue | ||
out[-1] = f"~ {filename}" | ||
out.extend(difflib.context_diff(xxd(io.BytesIO(file1)), | ||
xxd(io.BytesIO(file2)), | ||
f"{bsp1.filename}.PAKFILE.{filename}", | ||
f"{bsp2.filename}.PAKFILE.{filename}")) | ||
out.extend([f"+ {f}" for f in pak2_files if f not in pak1_files]) | ||
return "\n".join(out) | ||
|
||
|
||
# binary diff helpers | ||
def split(iterable: Iterable, chunk_size: int) -> Iterable: | ||
for i, _ in enumerate(iterable[::chunk_size]): | ||
yield iterable[i * chunk_size:(i + 1) * chunk_size] | ||
|
||
|
||
def xxd(data: io.BytesIO, width: int = 16) -> str: | ||
"""view a binary file like with a certain hex editor""" | ||
out = list() | ||
allowed_chars = re.compile(r"[a-zA-Z0-9/\\]") | ||
i, bytes_ = 0, data.read(width) | ||
while bytes_ != b"": | ||
address = f"0x{i * width:08X}" | ||
hex_ = " ".join([f"{b:02X}" for b in bytes_]) | ||
if len(hex_) < 3 * width: # pad last line of hex with spaces | ||
hex_ += " " * (3 * width - len(hex_)) | ||
ascii_ = "".join([chr(b) if allowed_chars.match(chr(b)) else "." for b in bytes_]) | ||
out.append(f"{address}: {hex_} {ascii_}\n") | ||
i, bytes_ = i + 1, data.read(width) | ||
return out | ||
|
||
|
||
if __name__ == "__main__": | ||
import os | ||
import sys | ||
sys.path.insert(0, r"C:\Users\Jared\Documents\GitHub\bsp_tool") | ||
import bsp_tool # run from top-level | ||
|
||
for r1_map, r2_map in shared_maps: | ||
with open(f"{r1_map}.diff", "w") as log_file: | ||
print(f"Writing {r1_map}.diff ...") | ||
r1_bsp = bsp_tool.load_bsp(os.path.join(r1_dir, f"{r1_map}.bsp")) | ||
r2_bsp = bsp_tool.load_bsp(os.path.join(r2_dir, f"{r2_map}.bsp")) | ||
log_file.write(diff_rbsps(r1_bsp, r2_bsp)) | ||
diff = diff_lumps(old_lump, new_lump) | ||
setattr(self, lump_name, diff) # cache | ||
return diff | ||
|
||
def save(self, base_filename: str, log_mode: base.LogMode = base.LogMode.VERBOSE): | ||
"""generate & save .diff files""" | ||
# for each lump (match by name) | ||
# filename.lump.00.ENTITIES.diff: old_goldsrc.ENTITIES (0) -> new_blue_shift.ENTITIES (1) | ||
# filename.lump.01.PLANES.diff: old_goldsrc.PLANES (1) -> new_blue_shift.PLANES (0) | ||
# RespawnBsp | ||
# -- filename.ENTITITES.fx.diff: filename_fx.ent | ||
# -- filename.lump.00XX.LUMP_NAME.diff | ||
# -- filename.lump.00XX.LUMP_NAME.bsp_lump.diff | ||
# filename.bsp.diff: headers & Y/N lump matches | ||
raise NotImplementedError() | ||
|
||
|
||
class NoneDiff(base.Diff): | ||
"""for diffing against None""" | ||
def short_stats(self) -> str: | ||
brand_new = self.old is None | ||
assert brand_new or self.new is None | ||
if brand_new: | ||
return f"{len(self.new)} insertions(+)" | ||
else: | ||
return f"{len(self.old)} deletions(-)" | ||
|
||
def unified_diff(self) -> List[str]: | ||
return [self.short_stats()] | ||
|
||
|
||
class HeadersDiff(base.Diff): | ||
# TODO: support comparisons between different branches | ||
# TODO: how do we communicate a change in branch order? | ||
# -- modern_warfare lump order & count is unique | ||
# -- will probably need it's own class | ||
old: Dict[str, Any] | ||
new: Dict[str, Any] | ||
_cache = Dict[str, List[str]] | ||
# NOTE: changes on offset can be knock on affect of changes to an earlier lump | ||
|
||
def __init__(self, old: Dict[str, Any], new: Dict[str, Any]): | ||
super().__init__(old, new) | ||
self._cache = dict() | ||
|
||
def __getitem__(self, lump_name: str) -> str: | ||
if lump_name not in {*self.old, *self.new}: | ||
raise KeyError(f"No {lump_name} header to diff") | ||
diff = self._cache.get(lump_name) | ||
if diff is None: | ||
old = f"{lump_name} {self.old[lump_name]!r}\n" | ||
new = f"{lump_name} {self.new[lump_name]!r}\n" | ||
diff = list(difflib.unified_diff([old], [new])) | ||
self._cache[lump_name] = diff | ||
return diff | ||
|
||
def short_stats(self) -> str: | ||
raise NotImplementedError() | ||
# TODO: how to summarise? | ||
|
||
def unified_diff(self) -> Generator[str, None, None]: | ||
for lump_name in self.old: | ||
for line in self[lump_name]: | ||
yield line |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import difflib | ||
import enum | ||
from typing import Generator, Iterable, List | ||
|
||
|
||
class LogMode(enum.Enum): | ||
FAST = 0 # no diff | ||
VERBOSE = 1 # small diff | ||
VERY_VERBOSE = 2 # maximum diff | ||
|
||
|
||
class Diff: | ||
old: Iterable[object] | ||
new: Iterable[object] | ||
|
||
def __init__(self, old: Iterable[object], new: Iterable[object]): | ||
self.old = old | ||
self.new = new | ||
|
||
def has_no_changes(self) -> bool: | ||
return self.old == self.new | ||
|
||
def as_text(self, log_mode=LogMode.VERBOSE) -> Generator[str, None, None]: | ||
"""formatted diff text, one line at a time""" | ||
if log_mode == LogMode.VERBOSE: | ||
yield self.short_stats() | ||
elif log_mode == LogMode.VERY_VERBOSE: # GENERATES A LOT OF TEXT! | ||
for line in self.unified_diff(): | ||
yield line | ||
else: # only raised if try to pull data from the generator | ||
raise NotImplementedError(f"Unexpected Log Mode: {log_mode}") | ||
|
||
def short_stats(self) -> str: | ||
"""mimick git diff --shortstat""" | ||
old = set(self.old) | ||
new = set(self.new) | ||
return f"{new.difference(old)} insertions(+) {old.difference(new)} deletions(-)" | ||
|
||
def unified_diff(self) -> List[str]: | ||
"""quick & dirty diff of __repr__""" | ||
# NOTE: if the __repr__ is "<Classname @ 0xMEMORYADDRESS>" equality cannot be detemined | ||
old = [repr(x) for x in self.old] | ||
new = [repr(x) for x in self.new] | ||
# TODO: metadata | ||
# --- old.name | ||
# +++ new.name | ||
for line in difflib.unified_diff(old, new): | ||
yield line | ||
|
||
|
||
# TODO: class BitFieldDiff(Diff): | ||
# TODO: class MappedArrayDiff(Diff): | ||
# TODO: class StructDiff(Diff): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import io | ||
import re | ||
|
||
|
||
def xxd(data: bytes, cols: int = 32, show_address: bool = False) -> str: | ||
"""view a binary file like with a certain hex editor""" | ||
data = io.BytesIO(data) | ||
allowed_chars = re.compile(r"[a-zA-Z0-9/\\]") | ||
address, bytes_ = 0, data.read(cols) | ||
while bytes_ != b"": | ||
hex_ = " ".join([f"{b:02X}" for b in bytes_]) | ||
if len(hex_) < 3 * cols: # last line needs padding | ||
hex_ += " " * (3 * cols - len(hex_)) | ||
ascii_ = "".join([c if allowed_chars.match(c) else "." for c in map(chr, bytes_)]) | ||
if show_address: | ||
yield f"0x{address:08X}: {hex_} {ascii_}\n" | ||
else: | ||
yield f"{hex_} {ascii_}\n" | ||
address, bytes_ = address + cols, data.read(cols) |
Oops, something went wrong.