From a61a8fc456db3db8616e676f095b3a972c2ba908 Mon Sep 17 00:00:00 2001 From: koki Date: Sat, 21 Dec 2024 18:54:36 +0800 Subject: [PATCH] ISA start --- .gitignore | 1 + examples/dis_demo.py | 6 +- ohre/abcre/dis/AsmMethod.py | 62 +++++++++++-------- ohre/abcre/dis/AsmString.py | 6 +- ohre/abcre/dis/ISA_reader.py | 111 +++++++++++++++++++++++++++++++++++ ohre/abcre/dis/NAC.py | 43 ++++++++++++++ ohre/abcre/dis/NACBlock.py | 33 +++++++++++ ohre/abcre/dis/NACBlocks.py | 27 +++++++++ ohre/abcre/dis/NACTYPE.py | 20 +++++++ ohre/misc/Log.py | 8 +-- ohre/misc/utils.py | 75 +++++++++++++++++++++++ 11 files changed, 357 insertions(+), 35 deletions(-) create mode 100644 ohre/abcre/dis/ISA_reader.py create mode 100644 ohre/abcre/dis/NAC.py create mode 100644 ohre/abcre/dis/NACBlock.py create mode 100644 ohre/abcre/dis/NACBlocks.py create mode 100644 ohre/abcre/dis/NACTYPE.py diff --git a/.gitignore b/.gitignore index ab0909a..836191b 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ rules_local/ *.out *.dis *.log +isa.yaml tmp/ tmp_extract/ tmp_hap_extract/ diff --git a/examples/dis_demo.py b/examples/dis_demo.py index b58a0a9..966ceae 100644 --- a/examples/dis_demo.py +++ b/examples/dis_demo.py @@ -17,6 +17,6 @@ print(f"> {dis_file}") # print(f"\n> {dis_file.debug_deep()}") for method in dis_file.methods: - print(f">> {method.debug_short()}") - for asmstr in dis_file.asmstrs: - print(f">> {asmstr}") + print(f">> {method.debug_deep()}") + # for asmstr in dis_file.asmstrs: + # print(f">> {asmstr}") diff --git a/ohre/abcre/dis/AsmMethod.py b/ohre/abcre/dis/AsmMethod.py index 62642b2..58aa4a8 100644 --- a/ohre/abcre/dis/AsmMethod.py +++ b/ohre/abcre/dis/AsmMethod.py @@ -1,7 +1,11 @@ from typing import Any, Dict, Iterable, List, Tuple from ohre.abcre.dis.AsmTypes import AsmTypes +from ohre.misc import utils from ohre.misc import Log +from ohre.abcre.dis.NAC import NAC +from ohre.abcre.dis.NACBlock import NACBlock +from ohre.abcre.dis.NACBlocks import NACBlocks class AsmMethod: @@ -14,8 +18,9 @@ def __init__(self, slotNumberIdx, lines: List[str]): self.class_func_name: str = "" self.func_type: str = "" self.args: List = list() - self.insts: List = list() - self._process_method(lines) + self.nac_blocks: NACBlocks | None = None + insts = self._process_method(lines) + self.nac_blocks = NACBlocks(insts) def _process_1st_line(self, line: str): parts = line.split(" ") @@ -46,46 +51,51 @@ def _process_1st_line(self, line: str): ty, name = arg_pair.strip().split(" ") self.args.append((ty, name)) - def _process_method(self, lines: List[str]): + def _process_method(self, lines: List[str]) -> List[List[str]]: + insts = list() self._process_1st_line(lines[0].strip()) for line in lines[1:]: line = line.strip() if (line.endswith(":")): - if (len(line.split(" ")) == 1): + if (len(line.split(" ")) == 1): # single str in a single line endswith ":", maybe label? tu = [line] - self.insts.append(tu) + insts.append(tu) else: Log.error(f"ERROR: {line} NOT tag?", True) - elif (len(line) == 0): + elif (len(line) == 0): # skip empty line continue - elif (line == "}"): - return - else: - tu = list(line.split(" ")) - for i in range(len(tu)): - if (tu[i].endswith(",")): - tu[i] = tu[i][:-1] - self.insts.append(tu) + elif (line == "}"): # process END + return insts + else: # common situation + tu = self._process_common_inst(line) + insts.append(tu) + return insts + + def _process_common_inst(self, line: str) -> List[str]: + line = line.strip() + idx = line.find(" ") + if (idx < 0): + ret = [line[:]] + return ret + ret = [line[:idx]] # opcode + idx += 1 + while (idx < len(line)): + start_idx = idx + idx = utils.find_next_delimiter(line, start_idx) + ret.append(line[start_idx: idx].strip()) + idx = idx + 1 + print(f"final ret({len(ret)}) {ret}") + return ret def __str__(self): return self.debug_short() def debug_short(self) -> str: out = f"AsmMethod: {self.slotNumberIdx} {self.func_type} {self.class_func_name} file: {self.file_name}\n\ -args({len(self.args)}) {self.args} insts({len(self.insts)})" +args({len(self.args)}) {self.args} nac_blocks({self.nac_blocks.len})" return out def debug_deep(self) -> str: - out_insts = "" - for line_num in range(len(self.insts)): - inst = self.insts[line_num] - out = f"{line_num}\t{inst[0]} " - for i in range(1, len(inst)): - if (i != len(inst) - 1): - out += f"{inst[i]}, " - else: - out += f"{inst[i]}" - out_insts += f"{out}\n" out = f"AsmMethod: {self.slotNumberIdx} {self.func_type} {self.class_func_name} file: {self.file_name}\n\ -args({len(self.args)}) {self.args} insts({len(self.insts)})\n{out_insts}" +args({len(self.args)}) {self.args} nac_blocks({self.nac_blocks.len})\n{self.nac_blocks.debug_deep()}" return out diff --git a/ohre/abcre/dis/AsmString.py b/ohre/abcre/dis/AsmString.py index e9f4ff7..e96b83b 100644 --- a/ohre/abcre/dis/AsmString.py +++ b/ohre/abcre/dis/AsmString.py @@ -9,11 +9,13 @@ def __init__(self, line: str): idx = line.find(", ") assert idx > 2 and idx < len(line) - 2 self.offset = int(line[:idx].split(":")[1], 16) - self.name_value = line[idx + 2:].split(":")[1] + remain_line = line[idx + 2:] + idx2 = remain_line.find(":") + self.name_value = remain_line[idx2 + 1:] def __str__(self): return self.debug_deep() def debug_deep(self): - out = f"AsmString {hex(self.offset)} {self.name_value}" + out = f"AsmString({hex(self.offset)}) {len(self.name_value)} {self.name_value}" return out diff --git a/ohre/abcre/dis/ISA_reader.py b/ohre/abcre/dis/ISA_reader.py new file mode 100644 index 0000000..f4a8af2 --- /dev/null +++ b/ohre/abcre/dis/ISA_reader.py @@ -0,0 +1,111 @@ +import json +import os +from typing import Any, Dict, Iterable, List, Tuple + +import yaml + +import ohre +from ohre.misc import Log, utils + + +class ISA: + def __init__(self, isa_file_path: str): + self.ori_d: Dict = utils.read_dict_from_yaml_file(isa_file_path) + assert self.ori_d is not None + + self.prefixes: Dict | None = None + self.prefixes = self._get_prefixes_dict() + assert self.prefixes is not None + Log.info(f"[ISA] self.prefixes {len(self.prefixes)} {self.prefixes}") + + self.opstr2infod: Dict[str, Dict] | None = None + self.opstr2infod = self._get_opstr_dict() + assert self.opstr2infod is not None + Log.info(f"[ISA] self.opstr2infod {len(self.opstr2infod)} keys: {self.opstr2infod.keys()}") + + def _get_prefixes_dict(self) -> Dict: + if (self.prefixes is not None): + return self.prefixes + ret = {} + for sub_d in self.ori_d["prefixes"]: + ret[sub_d["name"]] = {"description": sub_d["description"], "opcode_idx": sub_d["opcode_idx"]} + return ret + + def _get_prefix_opcode(self, prefix: str) -> int: + if (prefix in self.prefixes.keys()): + return self.prefixes[prefix]["opcode_idx"] + return -1 + + def _get_opstr_dict(self) -> Dict[str, Dict]: + ret = dict() + for group in self.ori_d["groups"]: + title = group["title"] if "title" in group.keys() else None + assert len(title) > 0 and isinstance(title, str) + description: str = group["description"].strip() if "description" in group.keys() else None + verification: List | None = group["verification"] if "verification" in group.keys() else None + exceptions: List | None = group["exceptions"] if "exceptions" in group.keys() else None + properties: List | None = group["properties"] if "properties" in group.keys() else None + namespace: str = group["namespace"].strip() if "namespace" in group.keys() else None + pseudo: str = group["pseudo"].strip() if "pseudo" in group.keys() else None + semantics: str = group["semantics"].strip() if "semantics" in group.keys() else None + + assert "instructions" in group.keys() + for ins in group["instructions"]: + assert "sig" in ins.keys() and "opcode_idx" in ins.keys() + opstr = ins["sig"].split(" ")[0].strip() + opcode_idx = ins["opcode_idx"] + + acc = ins["acc"] if "acc" in ins.keys() else None + format = ins["format"] if "format" in ins.keys() else None + prefix = ins["prefix"] if "prefix" in ins.keys() else None + + if (prefix is not None): # final_opcode = prefix_opcode|op_code # concat, not 'or' + prefix_opcode = self._get_prefix_opcode(prefix) + assert prefix_opcode != -1 + opcode_idx = [(prefix_opcode << 8) + op_code for op_code in opcode_idx] + + ret[opstr] = { + "sig": ins["sig"], + "acc": acc, "opcode_idx": opcode_idx, "prefix": prefix, "format": format, "title": title, + "description": description, "verification": verification, "exceptions": exceptions, + "properties": properties, "namespace": namespace, "pseudo": pseudo, "semantics": semantics} + return ret + + def get_opcodes(self, opstr: str) -> List | None: + opcode_info_d = self.get_opcode_info_dict(opstr) + if (opcode_info_d is None): + return None + else: + if ("opcode_idx" in opcode_info_d.keys()): + return opcode_info_d["opcode_idx"] + else: + Log.warn(f"[ISA] opstr {opstr}, opcode_idx not in {opcode_info_d.keys()}") + return None + + def get_opcode_info_dict(self, opstr: str) -> Dict | None: + if opstr in self.opstr2infod.keys(): + return self.opstr2infod[opstr] + else: + Log.warn(f"[ISA] opstr NOT hit directly, opstr {opstr}, remove prefix and match again", True) + for key_opstr in self.opstr2infod.keys(): + opstr_rhs = key_opstr + tmp = opstr_rhs.split(".") + if (len(tmp) > 1 and opstr == tmp[1]): + Log.warn(f"[ISA] opstr change: {opstr} -> {key_opstr}", True) + return self.opstr2infod[key_opstr] + return None + + +if __name__ == "__main__": + ohre.set_log_print(True) + d = utils.read_dict_from_yaml_file(os.path.join(os.path.dirname(os.path.abspath(__file__)), "isa.yaml")) + isa = ISA(os.path.join(os.path.dirname(os.path.abspath(__file__)), "isa.yaml")) + # print(json.dumps(isa.ori_d["groups"], indent=4)) + assert isa.get_opcodes("deprecated.getiteratornext") == [0xfc02] + assert isa.get_opcodes("callruntime.notifyconcurrentresult") == [0xfb00] + for ins_str in ["mov", "callruntime.definefieldbyindex", "isin"]: + print(f"{ins_str}: {utils.hexstr(isa.get_opcodes(ins_str))} {isa.get_opcode_info_dict(ins_str)}") + title_set = set() + for opstr in isa.opstr2infod.keys(): + title_set.add(isa.opstr2infod[opstr]["title"]) + print(f"{len(title_set)} {title_set}") diff --git a/ohre/abcre/dis/NAC.py b/ohre/abcre/dis/NAC.py new file mode 100644 index 0000000..1620ecf --- /dev/null +++ b/ohre/abcre/dis/NAC.py @@ -0,0 +1,43 @@ +from typing import Any, Dict, Iterable, List, Tuple +from ohre.abcre.dis.NACTYPE import NACTYPE + + +class NAC(): # N Address Code + # Native representation of ark_disasm-ed ArkTS bytecode + # corresponding to a single line in a panda function + + def __init__(self, op_args: List[str]): + assert len(op_args) > 0 + self.op = op_args[0] + self.type = NACTYPE.get_NAC_type(self.op) + self.args = list() + for i in range(1, len(op_args)): + self.args.append(op_args[i]) + + def __str__(self): + return self.debug_short() + + def _is_std_nac(self): + std_nac_set = {NACTYPE.ASSIGN, NACTYPE.COND_JMP, NACTYPE.UNCN_JMP, + NACTYPE.CALL, NACTYPE.COND_THROW, NACTYPE.UNCN_THROW, NACTYPE.RETURN} + if (self.type in std_nac_set): + return True + return False + + def debug_short(self): + out = f"{self.op} " + for i in range(len(self.args)): + if (i == len(self.args) - 1): + out += f"{self.args[i]}" + else: + out += f"{self.args[i]}, " + return out + + def debug_deep(self): + out = f"({NACTYPE.get_code_name(self.type)}) {self.op} " + for i in range(len(self.args)): + if (i == len(self.args) - 1): + out += f"{self.args[i]}" + else: + out += f"{self.args[i]}, " + return out diff --git a/ohre/abcre/dis/NACBlock.py b/ohre/abcre/dis/NACBlock.py new file mode 100644 index 0000000..29e0f13 --- /dev/null +++ b/ohre/abcre/dis/NACBlock.py @@ -0,0 +1,33 @@ +from typing import Any, Dict, Iterable, List, Tuple +from ohre.abcre.dis.NAC import NAC +from ohre.abcre.dis.NACTYPE import NACTYPE +import copy + + +class NACBLOCK_LV: + NATIVE = 0 + LEVEL1 = 1 + LEVEL2 = 2 + + +class NACBlock(): + def __init__(self, insts: List[List[str]], level=NACBLOCK_LV.NATIVE): + assert len(insts) > 0 + self.nacs: List[NAC] = list() + self.level = level + for inst in insts: + assert len(inst) > 0 + self.nacs.append(NAC(inst)) + + def __str__(self): + return self.debug_short() + + def debug_short(self): + out = f"NACBlock: nacs {len(self.nacs)} lv {self.level}" + return out + + def debug_deep(self): + out = f"NACBlock: nacs {len(self.nacs)} lv {self.level}\n" + for i in range(len(self.nacs)): + out += f"{i}\t{self.nacs[i].debug_deep()}\n" + return out \ No newline at end of file diff --git a/ohre/abcre/dis/NACBlocks.py b/ohre/abcre/dis/NACBlocks.py new file mode 100644 index 0000000..946b500 --- /dev/null +++ b/ohre/abcre/dis/NACBlocks.py @@ -0,0 +1,27 @@ +from typing import Any, Dict, Iterable, List, Tuple +from ohre.abcre.dis.NACBlock import NACBlock +from ohre.abcre.dis.NAC import NAC +from ohre.abcre.dis.NACTYPE import NACTYPE +import copy + + +class NACBlocks(): + def __init__(self, insts: List[List[str]]): + self.nac_blocks: List[NACBlock] = [NACBlock(insts)] + + def __str__(self): + return self.debug_short() + + @property + def len(self): + return len(self.nac_blocks) + + def debug_short(self): + out = f"NACBlocks: block len {len(self.nac_blocks)}" + return out + + def debug_deep(self): + out = f"{self.debug_short()}\n" + for i in range(len(self.nac_blocks)): + out += f"{i}-block: {self.nac_blocks[i].debug_deep()}\n" + return out \ No newline at end of file diff --git a/ohre/abcre/dis/NACTYPE.py b/ohre/abcre/dis/NACTYPE.py new file mode 100644 index 0000000..983cd15 --- /dev/null +++ b/ohre/abcre/dis/NACTYPE.py @@ -0,0 +1,20 @@ +from ohre.abcre.enum.BaseEnum import BaseEnum + + +class NACTYPE(BaseEnum): + def __init__(self): + super().__init__() + ASSIGN = 0 # at most 3 arg + COND_JMP = 1 # 3 arg + UNCN_JMP = 2 # 1 arg # unconditional + CALL = 3 # 1 or more arg + COND_THROW = 4 # 3 arg + UNCN_THROW = 5 # 1 arg + RETURN = 6 # 1 arg + IMPORT = 11 + LABEL = 12 + UNKNOWN = 99 + + @classmethod + def get_NAC_type(cls, op: str) -> int: + return NACTYPE.UNKNOWN \ No newline at end of file diff --git a/ohre/misc/Log.py b/ohre/misc/Log.py index 3d80cb0..2370113 100644 --- a/ohre/misc/Log.py +++ b/ohre/misc/Log.py @@ -1,8 +1,8 @@ +import datetime import logging import os -from logging.handlers import RotatingFileHandler import platform -import datetime +from logging.handlers import RotatingFileHandler g_log = None DEBUG_LOCAL = True @@ -76,13 +76,13 @@ def warn(logstr, print_flag=True): def error(logstr, print_flag=True): - if (print_flag and get_logger().getEffectiveLevel() <= logging.ERROR): + if (get_logger().getEffectiveLevel() <= logging.ERROR): debug_print(logstr, "error") g_log.error(logstr) def critical(logstr, print_flag=True): - if (print_flag and get_logger().getEffectiveLevel() <= logging.CRITICAL): + if (get_logger().getEffectiveLevel() <= logging.CRITICAL): debug_print(logstr, "criti") g_log.critical(logstr) diff --git a/ohre/misc/utils.py b/ohre/misc/utils.py index 33ff089..389cb61 100644 --- a/ohre/misc/utils.py +++ b/ohre/misc/utils.py @@ -1,2 +1,77 @@ +from typing import Any, Dict, Iterable, List, Tuple +import yaml + + def is_uppercase_or_underscore(s: str): return all(c.isupper() or c.isdigit() or c == "_" for c in s) + + +def find_idx_in_list(l, ele): + for i in range(len(l)): + if (l[i] == ele): + return i + return -1 + + +def is_right_and_match_stack_top(stack_l: list, pair_left_char_l: list, pair_right_char_l: list, c) -> bool: + if (len(stack_l) == 0): + return False + l_idx = find_idx_in_list(pair_left_char_l, stack_l[-1]) + assert l_idx >= 0 + r_idx = find_idx_in_list(pair_right_char_l, c) + if (r_idx == l_idx): + return True + return False + + +def is_left(pair_left_char_l, c): + if (find_idx_in_list(pair_left_char_l, c) >= 0): + return True + return False + + +def find_next_delimiter(line: str, start_idx: int = 0, delimiter: str = ",", + pair_left_char_l: List = ["\"", "(", "[", "{"], + pair_right_char_l: List = ["\"", ")", "]", "}"]): + stack_l = list() + for idx in range(start_idx, len(line)): + if (is_right_and_match_stack_top(stack_l, pair_left_char_l, pair_right_char_l, line[idx])): + stack_l.pop() + elif (is_left(pair_left_char_l, line[idx])): + stack_l.append(line[idx]) + elif (line.find(delimiter, idx) == idx and len(stack_l) == 0): + return idx + return len(line) + + +def read_dict_from_yaml_file(f_name: str) -> dict: + ret = None + with open(f_name) as stream: + try: + ret = yaml.safe_load(stream) + except yaml.YAMLError as e: + print(f"read yaml failed, e:{e}") + return ret + + +def hexstr(value) -> str: + ret = "" + if isinstance(value, Iterable): + for i in range(len(value)): + if (i != len(value) - 1): + ret += f"{hex(value[i])}," + else: + ret += f"{hex(value[i])}" + elif (isinstance(value, int)): + ret = f"{hex(value)}" + else: + ret = f"unsupported_value_type! value:{value}" + return ret + + +if __name__ == "__main__": + temp = """newlexenvwithname 0x2, { 5 [ i32:2, string:"4newTarget", i32:0, string:"this", i32:1, ]}""" + idx = find_next_delimiter(temp, 17) + print(f"idx {idx} {temp[17: idx]}") + idx = find_next_delimiter(temp, 22) + print(f"idx {idx} {temp[22: idx]}")