diff --git a/examples/dis_demo.py b/examples/dis_demo.py index 74d1ea8..b9f2fea 100644 --- a/examples/dis_demo.py +++ b/examples/dis_demo.py @@ -10,6 +10,7 @@ Log.init_log("abcre", ".") ohre.set_log_level("info") ohre.set_log_print(True) + Log.info(f"START {__file__}") parser = argparse.ArgumentParser() parser.add_argument("dis_path", type=str, help="path to the dis file (ark_disasm-ed abc)") arg = parser.parse_args() @@ -28,8 +29,13 @@ print(f">> {asmstr}") # === reverse truly START - FUNC_IDX = 27 - # print(f">> before ControlFlow build {dis_file.methods[FUNC_IDX]._debug_vstr()}") + FUNC_IDX = 5 # 5: onWindowStageCreate, call loadContent and pass a mothod as para; 7: mothod that used as para + # print(f">> before CF {dis_file.methods[FUNC_IDX]._debug_vstr()}") panda_re.split_native_code_block(FUNC_IDX) - print(f">> after ControlFlow build {panda_re.dis_file.methods[FUNC_IDX]._debug_vstr()}") + print(f">> CF built {panda_re.dis_file.methods[FUNC_IDX]._debug_vstr()}") panda_re.trans_NAC_to_TAC(method_id=FUNC_IDX) + + # for idx in range(panda_re.method_len()): + # panda_re.split_native_code_block(idx) + # print(f">> [{idx}/{panda_re.method_len()}] CF built {panda_re.dis_file.methods[idx]._debug_vstr()}") + # panda_re.trans_NAC_to_TAC(method_id=idx) \ No newline at end of file diff --git a/ohre/abcre/dis/AsmArg.py b/ohre/abcre/dis/AsmArg.py index 253f31d..351403f 100644 --- a/ohre/abcre/dis/AsmArg.py +++ b/ohre/abcre/dis/AsmArg.py @@ -1,16 +1,20 @@ +from typing import Any, Dict, Iterable, List, Tuple, Union + from ohre.abcre.dis.AsmTypes import AsmTypes from ohre.abcre.dis.DebugBase import DebugBase from ohre.misc import Log, utils class AsmArg(DebugBase): - def __init__(self, arg_type: AsmTypes = AsmTypes.UNKNOWN, name: str = "", value=None, obj_ref=None): + def __init__(self, arg_type: AsmTypes = AsmTypes.UNKNOWN, + name: str = "", value=None, obj_ref=None, paras_len: int = None): self.type = arg_type # name: e.g. for v0, type is VAR, name is v0(stored without truncating the prefix v) self.name: str = name # value: may be set in the subsequent analysis self.value = value self.obj_ref = obj_ref + self.paras_len: Union[int, None] = paras_len # for method object, store paras len here @property def len(self): @@ -20,7 +24,7 @@ def __len__(self) -> int: return self.len @classmethod - def build_arg(cls, s: str): + def build_arg(cls, s: str): # return VAR v0 v1... or ARG a0 a1... assert isinstance(s, str) and len(s) > 0 if (s.startswith("v")): return AsmArg(AsmTypes.VAR, s) @@ -45,6 +49,8 @@ def _debug_str(self): out += f"({self.value})" if (self.obj_ref is not None): out += f"//{self.obj_ref}" + if (self.paras_len is not None): + out += f"(paras_len={self.paras_len})" return out def _debug_vstr(self): diff --git a/ohre/abcre/dis/AsmLiteral.py b/ohre/abcre/dis/AsmLiteral.py index 2af429a..3adb44d 100644 --- a/ohre/abcre/dis/AsmLiteral.py +++ b/ohre/abcre/dis/AsmLiteral.py @@ -14,32 +14,37 @@ def __init__(self, lines: List[str]): self.address = int(first_line_parts[1], 16) self.module_request_array: Dict = None self.module_tags: List[Dict] = None - if (len(lines) == 1): + try: + for s in lines: + idx = s.find("MODULE_REQUEST_ARRAY: {") + if (idx >= 0): + self._process_module_request_array(lines) + return self._process_normal_literal(lines) - else: - self._process_module_request_array(lines) + except Exception as e: + Log.error(f"init ERROR in AsmLiteral, e {e}, lines {lines}") def _process_normal_literal(self, lines: List[str]): literal_content = ' '.join(lines) - s_idx = literal_content.find("{")+1 + s_idx = literal_content.find("{") + 1 e_idx = literal_content.find("[") element_amount_str = literal_content[s_idx:e_idx].strip() assert element_amount_str.isdigit(), f"Expected a digit for element amount, got {element_amount_str}" element_amount = int(element_amount_str) - s_idx = literal_content.find("[")+1 + s_idx = literal_content.find("[") + 1 e_idx = literal_content.find("]") element_content = literal_content[s_idx:e_idx] array_split_list = [x.strip() for x in element_content.strip().split(',') if len(x) > 0] - + method_dict = {} if 'method' in element_content and 'method_affiliate' in element_content: cnt = 0 while cnt < len(array_split_list): if 'string' in array_split_list[cnt]: method_string = array_split_list[cnt].split(':')[1].strip()[1:-1] - method_name = array_split_list[cnt+1].split(':')[1].strip() - method_aff = array_split_list[cnt+2].split(':')[1].strip() + method_name = array_split_list[cnt + 1].split(':')[1].strip() + method_aff = array_split_list[cnt + 2].split(':')[1].strip() method_dict[method_string] = {'method': method_name, 'method_affiliate': method_aff} cnt += 3 else: @@ -50,7 +55,7 @@ def _process_normal_literal(self, lines: List[str]): cnt = 0 while cnt < len(array_split_list): variable_string = array_split_list[cnt].split(':')[1].strip()[1:-1] - variable_value = array_split_list[cnt+1] + variable_value = array_split_list[cnt + 1] if 'null_value' in variable_value: variable_value = 'null_value' else: @@ -93,7 +98,7 @@ def _process_module_request_array(self, lines: List[str]): kv_s = module_tag_line.split(",") d = dict() for kv in kv_s: - key, value = utils.find_single_kv(kv.strip(), ":") + key, value = utils.find_single_kv(kv, ":") if (key is not None and value is not None): d[key] = value if (len(d)): diff --git a/ohre/abcre/dis/AsmMethod.py b/ohre/abcre/dis/AsmMethod.py index b725350..5dc052a 100644 --- a/ohre/abcre/dis/AsmMethod.py +++ b/ohre/abcre/dis/AsmMethod.py @@ -20,14 +20,18 @@ def __init__(self, slotNumberIdx, lines: List[str]): self.method_name: str = "" # TODO: split it accurately self.method_type: str = "" self.args: List = list() + self._process_method_1st_line(lines[0].strip()) self.code_blocks: Union[CodeBlocks, None] = None - self.code_blocks = CodeBlocks(self._process_method(lines)) + self.code_blocks = CodeBlocks(self._process_method_inst(lines)) # for nac tac analysis self.cur_module: str = "" - def _process_1st_line(self, line: str): + def _split_class_method_name(self, record_names): + pass # TODO: use record_names to split + + def _process_method_1st_line(self, line: str): parts = line.split(" ") assert parts[0] == ".function" self.return_type = parts[1].strip() @@ -59,9 +63,8 @@ def _process_1st_line(self, line: str): ty, name = arg_pair.strip().split(" ") self.args.append((ty, name)) - def _process_method(self, lines: List[str]) -> List[List[str]]: + def _process_method_inst(self, lines: List[str]) -> List[List[str]]: insts = list() - self._process_1st_line(lines[0].strip()) for line in lines[1:]: line = line.strip() if (line.endswith(":")): @@ -95,9 +98,8 @@ def _process_common_inst(self, line: str) -> List[str]: return ret def _debug_str(self) -> str: - out = f"AsmMethod: {self.slotNumberIdx} {self.method_type} {self.class_method_name} \ -ret {self.return_type} file: {self.file_name}\n\ -\targs({len(self.args)}) {self.args} code_blocks({len(self.code_blocks)})" + out = f"AsmMethod: {self.slotNumberIdx} {self.class_method_name} {self.method_type} \ +ret {self.return_type} [{self.file_name}] args({len(self.args)}) {self.args} cbs({len(self.code_blocks)})" return out def _debug_vstr(self) -> str: diff --git a/ohre/abcre/dis/AsmString.py b/ohre/abcre/dis/AsmString.py index 9e7f449..1a61a7e 100644 --- a/ohre/abcre/dis/AsmString.py +++ b/ohre/abcre/dis/AsmString.py @@ -6,13 +6,19 @@ class AsmString(DebugBase): - def __init__(self, line: str): + def __init__(self, lines: List[str]): + line = "" + for s in lines: + line += s + line = line.strip() + assert line[0] == "[" and line[-1] == "]" + line = line[1:-1] idx = line.find(", ") assert idx > 2 and idx < len(line) - 2 self.offset = int(line[:idx].split(":")[1], 16) - remain_line = line[idx + 2:] - idx2 = remain_line.find(":") - self.name_value = remain_line[idx2 + 1:] + line_remain = line[idx + 2:] + idx2 = line_remain.find(":") + self.name_value = line_remain[idx2 + 1:] def _debug_str(self): out = f"AsmString({hex(self.offset)}) {len(self.name_value)} {self.name_value}" diff --git a/ohre/abcre/dis/AsmTypes.py b/ohre/abcre/dis/AsmTypes.py index e0716ad..af71463 100644 --- a/ohre/abcre/dis/AsmTypes.py +++ b/ohre/abcre/dis/AsmTypes.py @@ -15,6 +15,7 @@ class AsmTypes(BaseEnum): LABEL = "label" # AsmArg: value not valid STR = "str" MODULE = "module" + METHOD_OBJ = "method_obj" UNDEFINED = "undefined" UNKNOWN = "unknown" # default value in this proj diff --git a/ohre/abcre/dis/DisFile.py b/ohre/abcre/dis/DisFile.py index 5fdcd2b..c5c6752 100644 --- a/ohre/abcre/dis/DisFile.py +++ b/ohre/abcre/dis/DisFile.py @@ -1,3 +1,4 @@ +from threading import Thread from typing import Any, Dict, Iterable, List, Tuple, Union from ohre.abcre.dis.AsmLiteral import AsmLiteral @@ -28,44 +29,47 @@ class DisFile(DebugBase): def __init__(self, value): self.source_binary_name: str = "" self.language: str = "" - self.lines: List[str] = list() # TODO: delete it, dont store self.literals: List[AsmLiteral] = list() self.records: List[AsmRecord] = list() self.methods: List[AsmMethod] = list() self.asmstrs: List[AsmString] = list() + lines: List[str] = list() if (isinstance(value, str)): file = open(value, "r", encoding="utf-8", errors="ignore") for line in file: - self.lines.append(line) + lines.append(line) file.close() else: Log.error(f"DisFile init ERROR: value type NOT supported, {type(value)} {value}") - self._dis_process_main() + self._dis_process_main(lines) - def _dis_process_main(self): + def _dis_process_main(self, lines: List[str]): + process_list: List[Thread] = [Thread(target=self._read_disheader, args=(0, lines))] l_n = 0 # line number - state = STATE.INIT - while (l_n < len(self.lines)): - Log.info(f"DisFile processing: state {state} line-{l_n}: {self.lines[l_n].rstrip()}") - if (state == STATE.INIT): - state, l_n = self._read_disheader(l_n) - elif (state == STATE.NEW_SEC): - state, l_n = self._read_section_type(l_n) - elif (state == STATE.LITERALS): - state, l_n = self._read_literals(l_n) - elif (state == STATE.RECORDS): - state, l_n = self._read_records(l_n) - elif (state == STATE.METHODS): - state, l_n = self._read_methods(l_n) - elif (state == STATE.STRING): - state, l_n = self._read_strings(l_n) - else: - Log.error(f"state ERROR, state {state} l_n {l_n}") - return - Log.info(f"DisFile process END, l_n {l_n} should >= {len(self.lines)}") - - def _read_section_type(self, l_n) -> Tuple[int, int]: - line: str = self.lines[l_n].strip() + while (l_n < len(lines)): + if (_is_delimiter(lines[l_n].strip())): + l_n += 1 + state, l_n = self._read_section_type(l_n, lines) + if (state == STATE.LITERALS): + process_list.append(Thread(target=self._read_literals, args=(l_n, lines))) + elif (state == STATE.RECORDS): + process_list.append(Thread(target=self._read_records, args=(l_n, lines))) + elif (state == STATE.METHODS): + process_list.append(Thread(target=self._read_methods, args=(l_n, lines))) + elif (state == STATE.STRING): + process_list.append(Thread(target=self._read_strings, args=(l_n, lines))) + else: + Log.error(f"state ERROR, state {state} l_n {l_n}") + l_n += 1 + Log.info(f"DisFile process threads START, l_n {l_n} should >= {len(lines)}") + for process in process_list: + process.start() + for process in process_list: + process.join() # wait for all process + Log.info(f"DisFile process END") + + def _read_section_type(self, l_n: int, lines: List[str]) -> Tuple[int, int]: + line: str = lines[l_n].strip() if (line.startswith("# ") and len(line) > 3): if (line[2:] == "LITERALS"): return STATE.LITERALS, l_n + 1 @@ -76,13 +80,13 @@ def _read_section_type(self, l_n) -> Tuple[int, int]: if (line[2:] == "STRING"): return STATE.STRING, l_n + 1 Log.error(f"cannot determint what section is, line: {line}") - return None, len(self.lines) + return None, len(lines) - def _read_disheader(self, l_n) -> Tuple[int, int]: - while (l_n < len(self.lines)): - line: str = self.lines[l_n].strip() + def _read_disheader(self, l_n: int, lines: List[str]): + while (l_n < len(lines)): + line: str = lines[l_n].strip() if (_is_delimiter(line)): - return STATE.NEW_SEC, l_n + 1 + return elif (line.startswith("# ")): if ("source binary:" in line): self.source_binary_name = line.split(":")[1].strip() @@ -94,31 +98,30 @@ def _read_disheader(self, l_n) -> Tuple[int, int]: Log.error(f"ERROR in _read_disheader, else hit. line {line}") l_n += 1 - def _read_literals(self, l_n: int) -> Tuple[int, int]: - while (l_n < len(self.lines)): - line: str = self.lines[l_n].strip() + def _read_literals(self, l_n: int, lines: List[str]): + while (l_n < len(lines)): + line: str = lines[l_n].strip() if (_is_delimiter(line)): - return STATE.NEW_SEC, l_n + 1 + return parts = line.split(" ") if (parts[0].isdigit()): - l_idx, n_idx = utils.find_matching_symbols_multi_line(self.lines[l_n:], "{") + l_idx, n_idx = utils.find_matching_symbols_multi_line(lines[l_n:], "{") if (l_idx is not None): - asm_lit = AsmLiteral(self.lines[l_n:l_n + l_idx + 1]) + asm_lit = AsmLiteral(lines[l_n:l_n + l_idx + 1]) self.literals.append(asm_lit) l_n += l_idx + 1 else: l_n += 1 - return None, l_n + 1 - def _read_records(self, l_n) -> Tuple[int, int]: - while (l_n < len(self.lines)): - line: str = self.lines[l_n].strip() + def _read_records(self, l_n: int, lines: List[str]): + while (l_n < len(lines)): + line: str = lines[l_n].strip() if (_is_delimiter(line)): - return STATE.NEW_SEC, l_n + 1 - elif (line.strip().startswith(".record")): + return + elif (line.startswith(".record")): lines_record: List[str] = list() - while (l_n < len(self.lines)): # find "}" - line_rec: str = self.lines[l_n].rstrip() + while (l_n < len(lines)): # find "}" + line_rec: str = lines[l_n].rstrip() lines_record.append(line_rec) l_n += 1 if ("}" in line_rec): @@ -127,22 +130,21 @@ def _read_records(self, l_n) -> Tuple[int, int]: self.records.append(rec) else: l_n += 1 - return None, l_n + 1 - def _read_methods(self, l_n) -> Tuple[int, int]: - while (l_n < len(self.lines)): - line: str = self.lines[l_n].strip() + def _read_methods(self, l_n: int, lines: List[str]): + while (l_n < len(lines)): + line: str = lines[l_n].strip() if (_is_delimiter(line)): - return STATE.NEW_SEC, l_n + 1 + return elif (line == "L_ESSlotNumberAnnotation:"): l_n += 1 - line: str = self.lines[l_n].strip() + line: str = lines[l_n].strip() parts = line.strip().split(" ") slotNumberIdx = int(parts[-2], 16) l_n += 1 lines_method: List[str] = list() - while (l_n < len(self.lines)): # find "}" - line_method: str = self.lines[l_n].rstrip() + while (l_n < len(lines)): # find "}" + line_method: str = lines[l_n].rstrip() lines_method.append(line_method) l_n += 1 if ("}" == line_method): @@ -151,36 +153,40 @@ def _read_methods(self, l_n) -> Tuple[int, int]: self.methods.append(method) else: l_n += 1 - return None, l_n + 1 - def _read_strings(self, l_n) -> Tuple[int, int]: - while (l_n < len(self.lines)): - line: str = self.lines[l_n].strip() + def _read_strings(self, l_n: int, lines: List[str]): + while (l_n < len(lines)): + line: str = lines[l_n].strip() if (_is_delimiter(line)): - return STATE.NEW_SEC, l_n + 1 + return elif (len(line) == 0): - pass - elif (line.startswith("[") and line.endswith("]") and len(line) > 6): - asmstr = AsmString(line[1:-1]) - self.asmstrs.append(asmstr) + l_n += 1 + elif (line.startswith("[")): # single or multi line + l_idx, n_idx = utils.find_matching_symbols_multi_line(lines[l_n:], "[") + if (l_idx is not None): + asmstr = AsmString(lines[l_n:l_n + l_idx + 1]) + self.asmstrs.append(asmstr) + l_n += l_idx + 1 else: - Log.error(f"ERROR in _read_strings, else hit. line {line}") - l_n += 1 + Log.error(f"ERROR in _read_strings, else hit. l_n {l_n} line {line}") + l_n += 1 return None, l_n + 1 def _debug_str(self) -> str: - out = f"DisFile: {self.source_binary_name} language {self.language} lines({len(self.lines)}) \ + out = f"DisFile: {self.source_binary_name} language {self.language} \ literals({len(self.literals)}) records({len(self.records)}) methods({len(self.methods)}) asmstrs({len(self.asmstrs)})" return out def _debug_vstr(self) -> str: out = self._debug_str() + "\n" + for lit in self.literals: + out += f">> {lit._debug_vstr()}\n" for rec in self.records: out += f">> {rec._debug_vstr()}\n" for method in self.methods: out += f">> {method._debug_vstr()}\n" for asmstr in self.asmstrs: - out += f">> {asmstr}\n" + out += f">> {asmstr._debug_vstr()}\n" return out def get_literal_by_addr(self, addr: int) -> Union[AsmLiteral, None]: diff --git a/ohre/abcre/dis/NACtoTAC.py b/ohre/abcre/dis/NACtoTAC.py index a3726cc..1af0997 100644 --- a/ohre/abcre/dis/NACtoTAC.py +++ b/ohre/abcre/dis/NACtoTAC.py @@ -84,7 +84,7 @@ def toTAC(self, nac: NAC, asm_method: AsmMethod, dis_file: DisFile) -> Union[TAC # === inst: call instructions # START if (nac.op == "callthis1"): pass - if (nac.op == "callargs1"): + if (nac.op == "callarg1"): return TAC.tac_call( arg_len=AsmArg(AsmTypes.IMM, value=1), paras=[AsmArg.build_arg(nac.args[1])], @@ -116,9 +116,9 @@ def toTAC(self, nac: NAC, asm_method: AsmMethod, dis_file: DisFile) -> Union[TAC # === inst: dynamic return # START if (nac.op == "returnundefined"): - pass + return TAC.tac_return(AsmArg(AsmTypes.UNDEFINED)) if (nac.op == "return"): - pass + return TAC.tac_return(AsmArg(AsmTypes.ACC)) # === inst: dynamic return # END # === inst: object visitors # START @@ -127,6 +127,11 @@ def toTAC(self, nac: NAC, asm_method: AsmMethod, dis_file: DisFile) -> Union[TAC AsmArg(AsmTypes.ACC), AsmArg(AsmTypes.STR, value=nac.args[1]), log=f"arg0: {nac.args[0]} todo: check ldobjbyname") + if (nac.op == "tryldglobalbyname"): + return TAC.tac_assign( + AsmArg(AsmTypes.ACC), + AsmArg(AsmTypes.STR, value=nac.args[1]), + log=f"arg0: {nac.args[0]} todo: check tryldglobalbyname, not throw now") if (nac.op == "ldexternalmodulevar"): index = int(nac.args[0], base=16) module_name = dis_file.get_external_module_name(index, asm_method.file_name, asm_method.class_method_name) @@ -135,12 +140,17 @@ def toTAC(self, nac: NAC, asm_method: AsmMethod, dis_file: DisFile) -> Union[TAC return TAC.tac_import(AsmArg(AsmTypes.MODULE, name=module_name)) else: asm_method.set_cur_module("module load failed") - if (nac.op == "tryldglobalbyname"): - pass if (nac.op == "copyrestargs"): return TAC.tac_unknown([AsmArg(AsmTypes.IMM, value=nac.args[0])], log="todo: copyrestargs imm:u8") # === inst: object visitors # END +# === inst: definition instuctions # START + if (nac.op == "definefunc"): + return TAC.tac_assign( + AsmArg(AsmTypes.ACC), + AsmArg(AsmTypes.METHOD_OBJ, value=nac.args[1], paras_len=int(nac.args[2], 16))) + # === inst: definition instuctions # END + Log.warn(f"toTAC failed, not support nac inst: {nac._debug_vstr()}", False) # to error when done return TAC.tac_unknown( [AsmArg(AsmTypes.UNKNOWN, nac.args[i]) for i in range(len(nac.args))], diff --git a/ohre/abcre/dis/PandaReverser.py b/ohre/abcre/dis/PandaReverser.py index 6425585..bea6e31 100644 --- a/ohre/abcre/dis/PandaReverser.py +++ b/ohre/abcre/dis/PandaReverser.py @@ -32,6 +32,9 @@ def trans_NAC_to_TAC(self, method_id: int = -1, method_name: str = None): else: pass + def method_len(self): + return len(self.dis_file.methods) + def _debug_str(self) -> str: out = f"PandaReverser: {self.dis_file}" return out diff --git a/ohre/abcre/dis/TAC.py b/ohre/abcre/dis/TAC.py index b19558d..75055aa 100644 --- a/ohre/abcre/dis/TAC.py +++ b/ohre/abcre/dis/TAC.py @@ -40,9 +40,9 @@ def tac_uncn_jmp(cls, dst: AsmArg, log: str = ""): def tac_import(cls, module_name: AsmArg, log: str = ""): return TAC(TACTYPE.IMPORT, [AsmArg(AsmTypes.ACC), module_name], log=log) - @classmethod # TODO: return - def tac_return(cls, paras: List[AsmArg] = None, log: str = ""): - return TAC(TACTYPE.UNKNOWN, paras, log=log) + @classmethod + def tac_return(cls, val: AsmArg, log: str = ""): + return TAC(TACTYPE.RETURN, [val], log=log) @classmethod def tac_call(cls, arg_len: AsmArg = None, paras: List[AsmArg] = None, this: AsmArg = None, log: str = ""): @@ -85,11 +85,11 @@ def _debug_vstr(self): elif (self.optype == TACTYPE.IMPORT and len(self.args) >= 2): out += f"{self.args[0]._debug_vstr()} = {self.args[1]._debug_vstr()}" elif (self.optype == TACTYPE.CALL and len(self.args) >= 2): - out += f"{self.args[0]._debug_vstr()} args({self.args[1].value})" - for i in range(self.args[1].value): - out += f" {self.args[i + 2]._debug_vstr()}" if (self.this is not None and len(self.this) > 0): - out += f" // this={self.this}" + out += f"{self.this}->" + out += f"{self.args[0]._debug_vstr()} args({self.args[1].value}):" + for i in range(self.args[1].value): + out += f" {self.args[i + 2]._debug_vstr()}," else: out += self._args_and_rop_common_debug_str() if (self.log is not None and len(self.log) > 0):